A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log"
9 "strings"
10 "time"
11
12 "Coves/internal/atproto/utils"
13 "Coves/internal/core/comments"
14
15 "github.com/lib/pq"
16)
17
18// Constants for comment validation and processing
19const (
20 // CommentCollection is the lexicon collection identifier for comments
21 CommentCollection = "social.coves.feed.comment"
22
23 // ATProtoScheme is the URI scheme for atProto AT-URIs
24 ATProtoScheme = "at://"
25
26 // MaxCommentContentBytes is the maximum allowed size for comment content
27 // Per lexicon: max 3000 graphemes, ~30000 bytes
28 MaxCommentContentBytes = 30000
29)
30
31// CommentEventConsumer consumes comment-related events from Jetstream
32// Handles CREATE, UPDATE, and DELETE operations for social.coves.feed.comment
33type CommentEventConsumer struct {
34 commentRepo comments.Repository
35 db *sql.DB // Direct DB access for atomic count updates
36}
37
38// NewCommentEventConsumer creates a new Jetstream consumer for comment events
39func NewCommentEventConsumer(
40 commentRepo comments.Repository,
41 db *sql.DB,
42) *CommentEventConsumer {
43 return &CommentEventConsumer{
44 commentRepo: commentRepo,
45 db: db,
46 }
47}
48
49// HandleEvent processes a Jetstream event for comment records
50func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
51 // We only care about commit events for comment records
52 if event.Kind != "commit" || event.Commit == nil {
53 return nil
54 }
55
56 commit := event.Commit
57
58 // Handle comment record operations
59 if commit.Collection == CommentCollection {
60 switch commit.Operation {
61 case "create":
62 return c.createComment(ctx, event.Did, commit)
63 case "update":
64 return c.updateComment(ctx, event.Did, commit)
65 case "delete":
66 return c.deleteComment(ctx, event.Did, commit)
67 }
68 }
69
70 // Silently ignore other operations and collections
71 return nil
72}
73
74// createComment indexes a new comment from the firehose and updates parent counts
75func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
76 if commit.Record == nil {
77 return fmt.Errorf("comment create event missing record data")
78 }
79
80 // Parse the comment record
81 commentRecord, err := parseCommentRecord(commit.Record)
82 if err != nil {
83 return fmt.Errorf("failed to parse comment record: %w", err)
84 }
85
86 // SECURITY: Validate this is a legitimate comment event
87 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
88 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
89 return err
90 }
91
92 // Build AT-URI for this comment
93 // Format: at://commenter_did/social.coves.feed.comment/rkey
94 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
95
96 // Parse timestamp from record
97 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
98 if err != nil {
99 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
100 createdAt = time.Now()
101 }
102
103 // Serialize optional JSON fields
104 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
105
106 // Build comment entity
107 comment := &comments.Comment{
108 URI: uri,
109 CID: commit.CID,
110 RKey: commit.RKey,
111 CommenterDID: repoDID, // Comment comes from user's repository
112 RootURI: commentRecord.Reply.Root.URI,
113 RootCID: commentRecord.Reply.Root.CID,
114 ParentURI: commentRecord.Reply.Parent.URI,
115 ParentCID: commentRecord.Reply.Parent.CID,
116 Content: commentRecord.Content,
117 ContentFacets: facetsJSON,
118 Embed: embedJSON,
119 ContentLabels: labelsJSON,
120 Langs: commentRecord.Langs,
121 CreatedAt: createdAt,
122 IndexedAt: time.Now(),
123 }
124
125 // Atomically: Index comment + Update parent counts
126 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
127 return fmt.Errorf("failed to index comment and update counts: %w", err)
128 }
129
130 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
131 return nil
132}
133
134// updateComment updates an existing comment's content fields
135func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
136 if commit.Record == nil {
137 return fmt.Errorf("comment update event missing record data")
138 }
139
140 // Parse the updated comment record
141 commentRecord, err := parseCommentRecord(commit.Record)
142 if err != nil {
143 return fmt.Errorf("failed to parse comment record: %w", err)
144 }
145
146 // SECURITY: Validate this is a legitimate update
147 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
148 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
149 return err
150 }
151
152 // Build AT-URI for the comment being updated
153 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
154
155 // Fetch existing comment to validate threading references are immutable
156 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
157 if err != nil {
158 if err == comments.ErrCommentNotFound {
159 // Comment doesn't exist yet - might arrive out of order
160 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
161 return nil
162 }
163 return fmt.Errorf("failed to get existing comment for validation: %w", err)
164 }
165
166 // SECURITY: Threading references are IMMUTABLE after creation
167 // Reject updates that attempt to change root/parent (prevents thread hijacking)
168 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
169 existingComment.RootCID != commentRecord.Reply.Root.CID ||
170 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
171 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
172 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
173 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
174 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
175 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
176 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
177 return fmt.Errorf("comment threading references cannot be changed after creation")
178 }
179
180 // Serialize optional JSON fields
181 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
182
183 // Build comment update entity (preserves vote counts and created_at)
184 comment := &comments.Comment{
185 URI: uri,
186 CID: commit.CID,
187 Content: commentRecord.Content,
188 ContentFacets: facetsJSON,
189 Embed: embedJSON,
190 ContentLabels: labelsJSON,
191 Langs: commentRecord.Langs,
192 }
193
194 // Update the comment in repository
195 if err := c.commentRepo.Update(ctx, comment); err != nil {
196 return fmt.Errorf("failed to update comment: %w", err)
197 }
198
199 log.Printf("✓ Updated comment: %s", uri)
200 return nil
201}
202
203// deleteComment soft-deletes a comment and updates parent counts
204func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
205 // Build AT-URI for the comment being deleted
206 uri := fmt.Sprintf("at://%s/social.coves.feed.comment/%s", repoDID, commit.RKey)
207
208 // Get existing comment to know its parent (for decrementing the right counter)
209 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
210 if err != nil {
211 if err == comments.ErrCommentNotFound {
212 // Idempotent: Comment already deleted or never existed
213 log.Printf("Comment already deleted or not found: %s", uri)
214 return nil
215 }
216 return fmt.Errorf("failed to get existing comment: %w", err)
217 }
218
219 // Atomically: Soft-delete comment + Update parent counts
220 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
221 return fmt.Errorf("failed to delete comment and update counts: %w", err)
222 }
223
224 log.Printf("✓ Deleted comment: %s", uri)
225 return nil
226}
227
228// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
229func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
230 tx, err := c.db.BeginTx(ctx, nil)
231 if err != nil {
232 return fmt.Errorf("failed to begin transaction: %w", err)
233 }
234 defer func() {
235 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
236 log.Printf("Failed to rollback transaction: %v", rollbackErr)
237 }
238 }()
239
240 // 1. Check if comment exists and handle resurrection case
241 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
242 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
243 var existingID int64
244 var existingDeletedAt *time.Time
245 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
246 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
247
248 var commentID int64
249
250 if checkErr == nil {
251 // Comment exists
252 if existingDeletedAt == nil {
253 // Not deleted - this is an idempotent replay, skip gracefully
254 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
255 if commitErr := tx.Commit(); commitErr != nil {
256 return fmt.Errorf("failed to commit transaction: %w", commitErr)
257 }
258 return nil
259 }
260
261 // Comment was soft-deleted, now being recreated (resurrection)
262 // This is a NEW record with same rkey - update ALL fields including threading refs
263 // User may have deleted old comment and created a new one on a different parent/root
264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
265 commentID = existingID
266
267 resurrectQuery := `
268 UPDATE comments
269 SET
270 cid = $1,
271 commenter_did = $2,
272 root_uri = $3,
273 root_cid = $4,
274 parent_uri = $5,
275 parent_cid = $6,
276 content = $7,
277 content_facets = $8,
278 embed = $9,
279 content_labels = $10,
280 langs = $11,
281 created_at = $12,
282 indexed_at = $13,
283 deleted_at = NULL,
284 reply_count = 0
285 WHERE id = $14
286 `
287
288 _, err = tx.ExecContext(
289 ctx, resurrectQuery,
290 comment.CID,
291 comment.CommenterDID,
292 comment.RootURI,
293 comment.RootCID,
294 comment.ParentURI,
295 comment.ParentCID,
296 comment.Content,
297 comment.ContentFacets,
298 comment.Embed,
299 comment.ContentLabels,
300 pq.Array(comment.Langs),
301 comment.CreatedAt,
302 time.Now(),
303 commentID,
304 )
305 if err != nil {
306 return fmt.Errorf("failed to resurrect comment: %w", err)
307 }
308
309 } else if checkErr == sql.ErrNoRows {
310 // Comment doesn't exist - insert new comment
311 insertQuery := `
312 INSERT INTO comments (
313 uri, cid, rkey, commenter_did,
314 root_uri, root_cid, parent_uri, parent_cid,
315 content, content_facets, embed, content_labels, langs,
316 created_at, indexed_at
317 ) VALUES (
318 $1, $2, $3, $4,
319 $5, $6, $7, $8,
320 $9, $10, $11, $12, $13,
321 $14, $15
322 )
323 RETURNING id
324 `
325
326 err = tx.QueryRowContext(
327 ctx, insertQuery,
328 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
329 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
330 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
331 comment.CreatedAt, time.Now(),
332 ).Scan(&commentID)
333 if err != nil {
334 return fmt.Errorf("failed to insert comment: %w", err)
335 }
336
337 } else {
338 // Unexpected error checking for existing comment
339 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
340 }
341
342 // 1.5. Reconcile reply_count for this newly inserted comment
343 // In case any replies arrived out-of-order before this parent was indexed
344 reconcileQuery := `
345 UPDATE comments
346 SET reply_count = (
347 SELECT COUNT(*)
348 FROM comments c
349 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
350 )
351 WHERE id = $2
352 `
353 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
354 if reconcileErr != nil {
355 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
356 // Continue anyway - this is a best-effort reconciliation
357 }
358
359 // 2. Update parent counts atomically
360 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
361 // Parse collection from parent URI to determine target table
362 //
363 // FIXME(P1): Post comment_count reconciliation not implemented
364 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering),
365 // the post update below returns 0 rows and we only log a warning. Later, when the post
366 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing
367 // comments. This causes posts to have permanently stale comment_count values.
368 //
369 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments
370 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri
371 // matches the post URI and set comment_count accordingly.
372 //
373 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation
374 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
375
376 var updateQuery string
377 switch collection {
378 case "social.coves.community.post":
379 // Comment on post - update posts.comment_count
380 updateQuery = `
381 UPDATE posts
382 SET comment_count = comment_count + 1
383 WHERE uri = $1 AND deleted_at IS NULL
384 `
385
386 case "social.coves.feed.comment":
387 // Reply to comment - update comments.reply_count
388 updateQuery = `
389 UPDATE comments
390 SET reply_count = reply_count + 1
391 WHERE uri = $1 AND deleted_at IS NULL
392 `
393
394 default:
395 // Unknown or unsupported parent collection
396 // Comment is still indexed, we just don't update parent counts
397 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
398 if commitErr := tx.Commit(); commitErr != nil {
399 return fmt.Errorf("failed to commit transaction: %w", commitErr)
400 }
401 return nil
402 }
403
404 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
405 if err != nil {
406 return fmt.Errorf("failed to update parent count: %w", err)
407 }
408
409 rowsAffected, err := result.RowsAffected()
410 if err != nil {
411 return fmt.Errorf("failed to check update result: %w", err)
412 }
413
414 // If parent not found, that's OK (parent might not be indexed yet)
415 if rowsAffected == 0 {
416 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
417 }
418
419 // Commit transaction
420 if err := tx.Commit(); err != nil {
421 return fmt.Errorf("failed to commit transaction: %w", err)
422 }
423
424 return nil
425}
426
427// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
428func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
429 tx, err := c.db.BeginTx(ctx, nil)
430 if err != nil {
431 return fmt.Errorf("failed to begin transaction: %w", err)
432 }
433 defer func() {
434 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
435 log.Printf("Failed to rollback transaction: %v", rollbackErr)
436 }
437 }()
438
439 // 1. Soft-delete the comment (idempotent)
440 deleteQuery := `
441 UPDATE comments
442 SET deleted_at = $2
443 WHERE uri = $1 AND deleted_at IS NULL
444 `
445
446 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
447 if err != nil {
448 return fmt.Errorf("failed to delete comment: %w", err)
449 }
450
451 rowsAffected, err := result.RowsAffected()
452 if err != nil {
453 return fmt.Errorf("failed to check delete result: %w", err)
454 }
455
456 // Idempotent: If no rows affected, comment already deleted
457 if rowsAffected == 0 {
458 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
459 if commitErr := tx.Commit(); commitErr != nil {
460 return fmt.Errorf("failed to commit transaction: %w", commitErr)
461 }
462 return nil
463 }
464
465 // 2. Decrement parent counts atomically
466 // Parent could be a post or comment - parse collection to determine target table
467 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
468
469 var updateQuery string
470 switch collection {
471 case "social.coves.community.post":
472 // Comment on post - decrement posts.comment_count
473 updateQuery = `
474 UPDATE posts
475 SET comment_count = GREATEST(0, comment_count - 1)
476 WHERE uri = $1 AND deleted_at IS NULL
477 `
478
479 case "social.coves.feed.comment":
480 // Reply to comment - decrement comments.reply_count
481 updateQuery = `
482 UPDATE comments
483 SET reply_count = GREATEST(0, reply_count - 1)
484 WHERE uri = $1 AND deleted_at IS NULL
485 `
486
487 default:
488 // Unknown or unsupported parent collection
489 // Comment is still deleted, we just don't update parent counts
490 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
491 if commitErr := tx.Commit(); commitErr != nil {
492 return fmt.Errorf("failed to commit transaction: %w", commitErr)
493 }
494 return nil
495 }
496
497 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
498 if err != nil {
499 return fmt.Errorf("failed to update parent count: %w", err)
500 }
501
502 rowsAffected, err = result.RowsAffected()
503 if err != nil {
504 return fmt.Errorf("failed to check update result: %w", err)
505 }
506
507 // If parent not found, that's OK (parent might be deleted)
508 if rowsAffected == 0 {
509 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
510 }
511
512 // Commit transaction
513 if err := tx.Commit(); err != nil {
514 return fmt.Errorf("failed to commit transaction: %w", err)
515 }
516
517 return nil
518}
519
520// validateCommentEvent performs security validation on comment events
521func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
522 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
523 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
524 //
525 // We do NOT check if the user exists in AppView because:
526 // 1. Comment events may arrive before user events in Jetstream (race condition)
527 // 2. The comment came from the user's PDS repository (authenticated by PDS)
528 // 3. The database FK constraint was removed to allow out-of-order indexing
529 // 4. Orphaned comments (from never-indexed users) are harmless
530 //
531 // Security is maintained because:
532 // - Comment must come from user's own PDS repository (verified by atProto)
533 // - Fake DIDs will fail PDS authentication
534
535 // Validate DID format (basic sanity check)
536 if !strings.HasPrefix(repoDID, "did:") {
537 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
538 }
539
540 // Validate content is not empty (required per lexicon)
541 if comment.Content == "" {
542 return fmt.Errorf("comment content is required")
543 }
544
545 // Validate content length (defensive check - PDS should enforce this)
546 // Per lexicon: max 3000 graphemes, ~30000 bytes
547 // We check bytes as a simple defensive measure
548 if len(comment.Content) > MaxCommentContentBytes {
549 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
550 }
551
552 // Validate reply references exist
553 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
554 return fmt.Errorf("invalid root reference: must have both URI and CID")
555 }
556
557 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
558 return fmt.Errorf("invalid parent reference: must have both URI and CID")
559 }
560
561 // Validate AT-URI structure for root and parent
562 if err := validateATURI(comment.Reply.Root.URI); err != nil {
563 return fmt.Errorf("invalid root URI: %w", err)
564 }
565
566 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
567 return fmt.Errorf("invalid parent URI: %w", err)
568 }
569
570 return nil
571}
572
573// validateATURI performs basic structure validation on AT-URIs
574// Format: at://did:method:id/collection/rkey
575// This is defensive validation - we trust PDS but catch obviously malformed URIs
576func validateATURI(uri string) error {
577 if !strings.HasPrefix(uri, ATProtoScheme) {
578 return fmt.Errorf("must start with %s", ATProtoScheme)
579 }
580
581 // Remove at:// prefix and split by /
582 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
583 parts := strings.Split(withoutScheme, "/")
584
585 // Must have at least 3 parts: did, collection, rkey
586 if len(parts) < 3 {
587 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
588 }
589
590 // First part should be a DID
591 if !strings.HasPrefix(parts[0], "did:") {
592 return fmt.Errorf("repository identifier must be a DID")
593 }
594
595 // Collection and rkey should not be empty
596 if parts[1] == "" || parts[2] == "" {
597 return fmt.Errorf("collection and rkey cannot be empty")
598 }
599
600 return nil
601}
602
603// CommentRecordFromJetstream represents a comment record as received from Jetstream
604// Matches social.coves.feed.comment lexicon
605type CommentRecordFromJetstream struct {
606 Labels interface{} `json:"labels,omitempty"`
607 Embed map[string]interface{} `json:"embed,omitempty"`
608 Reply ReplyRefFromJetstream `json:"reply"`
609 Type string `json:"$type"`
610 Content string `json:"content"`
611 CreatedAt string `json:"createdAt"`
612 Facets []interface{} `json:"facets,omitempty"`
613 Langs []string `json:"langs,omitempty"`
614}
615
616// ReplyRefFromJetstream represents the threading structure
617type ReplyRefFromJetstream struct {
618 Root StrongRefFromJetstream `json:"root"`
619 Parent StrongRefFromJetstream `json:"parent"`
620}
621
622// parseCommentRecord parses a comment record from Jetstream event data
623func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
624 // Marshal to JSON and back for proper type conversion
625 recordJSON, err := json.Marshal(record)
626 if err != nil {
627 return nil, fmt.Errorf("failed to marshal record: %w", err)
628 }
629
630 var comment CommentRecordFromJetstream
631 if err := json.Unmarshal(recordJSON, &comment); err != nil {
632 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
633 }
634
635 // Validate required fields
636 if comment.Content == "" {
637 return nil, fmt.Errorf("comment record missing content field")
638 }
639
640 if comment.CreatedAt == "" {
641 return nil, fmt.Errorf("comment record missing createdAt field")
642 }
643
644 return &comment, nil
645}
646
647// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
648// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
649func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
650 // Serialize facets if present
651 if len(commentRecord.Facets) > 0 {
652 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
653 facetsStr := string(facetsBytes)
654 facetsJSON = &facetsStr
655 }
656 }
657
658 // Serialize embed if present
659 if len(commentRecord.Embed) > 0 {
660 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
661 embedStr := string(embedBytes)
662 embedJSON = &embedStr
663 }
664 }
665
666 // Serialize labels if present
667 if commentRecord.Labels != nil {
668 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
669 labelsStr := string(labelsBytes)
670 labelsJSON = &labelsStr
671 }
672 }
673
674 return facetsJSON, embedJSON, labelsJSON
675}