A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/comments"
6 "context"
7 "database/sql"
8 "encoding/json"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13
14 "github.com/lib/pq"
15)
16
17// Constants for comment validation and processing
18const (
19 // CommentCollection is the lexicon collection identifier for comments
20 CommentCollection = "social.coves.community.comment"
21
22 // ATProtoScheme is the URI scheme for atProto AT-URIs
23 ATProtoScheme = "at://"
24
25 // MaxCommentContentBytes is the maximum allowed size for comment content
26 // Per lexicon: max 3000 graphemes, ~30000 bytes
27 MaxCommentContentBytes = 30000
28)
29
30// CommentEventConsumer consumes comment-related events from Jetstream
31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment
32type CommentEventConsumer struct {
33 commentRepo comments.Repository
34 db *sql.DB // Direct DB access for atomic count updates
35}
36
37// NewCommentEventConsumer creates a new Jetstream consumer for comment events
38func NewCommentEventConsumer(
39 commentRepo comments.Repository,
40 db *sql.DB,
41) *CommentEventConsumer {
42 return &CommentEventConsumer{
43 commentRepo: commentRepo,
44 db: db,
45 }
46}
47
48// HandleEvent processes a Jetstream event for comment records
49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
50 // We only care about commit events for comment records
51 if event.Kind != "commit" || event.Commit == nil {
52 return nil
53 }
54
55 commit := event.Commit
56
57 // Handle comment record operations
58 if commit.Collection == CommentCollection {
59 switch commit.Operation {
60 case "create":
61 return c.createComment(ctx, event.Did, commit)
62 case "update":
63 return c.updateComment(ctx, event.Did, commit)
64 case "delete":
65 return c.deleteComment(ctx, event.Did, commit)
66 }
67 }
68
69 // Silently ignore other operations and collections
70 return nil
71}
72
73// createComment indexes a new comment from the firehose and updates parent counts
74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
75 if commit.Record == nil {
76 return fmt.Errorf("comment create event missing record data")
77 }
78
79 // Parse the comment record
80 commentRecord, err := parseCommentRecord(commit.Record)
81 if err != nil {
82 return fmt.Errorf("failed to parse comment record: %w", err)
83 }
84
85 // SECURITY: Validate this is a legitimate comment event
86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
88 return err
89 }
90
91 // Build AT-URI for this comment
92 // Format: at://commenter_did/social.coves.community.comment/rkey
93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
94
95 // Parse timestamp from record
96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
97 if err != nil {
98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
99 createdAt = time.Now()
100 }
101
102 // Serialize optional JSON fields
103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
104
105 // Build comment entity
106 comment := &comments.Comment{
107 URI: uri,
108 CID: commit.CID,
109 RKey: commit.RKey,
110 CommenterDID: repoDID, // Comment comes from user's repository
111 RootURI: commentRecord.Reply.Root.URI,
112 RootCID: commentRecord.Reply.Root.CID,
113 ParentURI: commentRecord.Reply.Parent.URI,
114 ParentCID: commentRecord.Reply.Parent.CID,
115 Content: commentRecord.Content,
116 ContentFacets: facetsJSON,
117 Embed: embedJSON,
118 ContentLabels: labelsJSON,
119 Langs: commentRecord.Langs,
120 CreatedAt: createdAt,
121 IndexedAt: time.Now(),
122 }
123
124 // Atomically: Index comment + Update parent counts
125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
126 return fmt.Errorf("failed to index comment and update counts: %w", err)
127 }
128
129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
130 return nil
131}
132
133// updateComment updates an existing comment's content fields
134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
135 if commit.Record == nil {
136 return fmt.Errorf("comment update event missing record data")
137 }
138
139 // Parse the updated comment record
140 commentRecord, err := parseCommentRecord(commit.Record)
141 if err != nil {
142 return fmt.Errorf("failed to parse comment record: %w", err)
143 }
144
145 // SECURITY: Validate this is a legitimate update
146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
148 return err
149 }
150
151 // Build AT-URI for the comment being updated
152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
153
154 // Fetch existing comment to validate threading references are immutable
155 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
156 if err != nil {
157 if err == comments.ErrCommentNotFound {
158 // Comment doesn't exist yet - might arrive out of order
159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
160 return nil
161 }
162 return fmt.Errorf("failed to get existing comment for validation: %w", err)
163 }
164
165 // SECURITY: Threading references are IMMUTABLE after creation
166 // Reject updates that attempt to change root/parent (prevents thread hijacking)
167 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
168 existingComment.RootCID != commentRecord.Reply.Root.CID ||
169 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
170 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
176 return fmt.Errorf("comment threading references cannot be changed after creation")
177 }
178
179 // Serialize optional JSON fields
180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
181
182 // Build comment update entity (preserves vote counts and created_at)
183 comment := &comments.Comment{
184 URI: uri,
185 CID: commit.CID,
186 Content: commentRecord.Content,
187 ContentFacets: facetsJSON,
188 Embed: embedJSON,
189 ContentLabels: labelsJSON,
190 Langs: commentRecord.Langs,
191 }
192
193 // Update the comment in repository
194 if err := c.commentRepo.Update(ctx, comment); err != nil {
195 return fmt.Errorf("failed to update comment: %w", err)
196 }
197
198 log.Printf("✓ Updated comment: %s", uri)
199 return nil
200}
201
202// deleteComment soft-deletes a comment and updates parent counts
203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
204 // Build AT-URI for the comment being deleted
205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
206
207 // Get existing comment to know its parent (for decrementing the right counter)
208 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
209 if err != nil {
210 if err == comments.ErrCommentNotFound {
211 // Idempotent: Comment already deleted or never existed
212 log.Printf("Comment already deleted or not found: %s", uri)
213 return nil
214 }
215 return fmt.Errorf("failed to get existing comment: %w", err)
216 }
217
218 // Atomically: Soft-delete comment + Update parent counts
219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
220 return fmt.Errorf("failed to delete comment and update counts: %w", err)
221 }
222
223 log.Printf("✓ Deleted comment: %s", uri)
224 return nil
225}
226
227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
229 tx, err := c.db.BeginTx(ctx, nil)
230 if err != nil {
231 return fmt.Errorf("failed to begin transaction: %w", err)
232 }
233 defer func() {
234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
235 log.Printf("Failed to rollback transaction: %v", rollbackErr)
236 }
237 }()
238
239 // 1. Check if comment exists and handle resurrection case
240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
242 var existingID int64
243 var existingDeletedAt *time.Time
244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
246
247 var commentID int64
248
249 if checkErr == nil {
250 // Comment exists
251 if existingDeletedAt == nil {
252 // Not deleted - this is an idempotent replay, skip gracefully
253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
254 if commitErr := tx.Commit(); commitErr != nil {
255 return fmt.Errorf("failed to commit transaction: %w", commitErr)
256 }
257 return nil
258 }
259
260 // Comment was soft-deleted, now being recreated (resurrection)
261 // This is a NEW record with same rkey - update ALL fields including threading refs
262 // User may have deleted old comment and created a new one on a different parent/root
263 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
264 commentID = existingID
265
266 resurrectQuery := `
267 UPDATE comments
268 SET
269 cid = $1,
270 commenter_did = $2,
271 root_uri = $3,
272 root_cid = $4,
273 parent_uri = $5,
274 parent_cid = $6,
275 content = $7,
276 content_facets = $8,
277 embed = $9,
278 content_labels = $10,
279 langs = $11,
280 created_at = $12,
281 indexed_at = $13,
282 deleted_at = NULL,
283 reply_count = 0
284 WHERE id = $14
285 `
286
287 _, err = tx.ExecContext(
288 ctx, resurrectQuery,
289 comment.CID,
290 comment.CommenterDID,
291 comment.RootURI,
292 comment.RootCID,
293 comment.ParentURI,
294 comment.ParentCID,
295 comment.Content,
296 comment.ContentFacets,
297 comment.Embed,
298 comment.ContentLabels,
299 pq.Array(comment.Langs),
300 comment.CreatedAt,
301 time.Now(),
302 commentID,
303 )
304 if err != nil {
305 return fmt.Errorf("failed to resurrect comment: %w", err)
306 }
307
308 } else if checkErr == sql.ErrNoRows {
309 // Comment doesn't exist - insert new comment
310 insertQuery := `
311 INSERT INTO comments (
312 uri, cid, rkey, commenter_did,
313 root_uri, root_cid, parent_uri, parent_cid,
314 content, content_facets, embed, content_labels, langs,
315 created_at, indexed_at
316 ) VALUES (
317 $1, $2, $3, $4,
318 $5, $6, $7, $8,
319 $9, $10, $11, $12, $13,
320 $14, $15
321 )
322 RETURNING id
323 `
324
325 err = tx.QueryRowContext(
326 ctx, insertQuery,
327 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
328 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
329 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
330 comment.CreatedAt, time.Now(),
331 ).Scan(&commentID)
332 if err != nil {
333 return fmt.Errorf("failed to insert comment: %w", err)
334 }
335
336 } else {
337 // Unexpected error checking for existing comment
338 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
339 }
340
341 // 1.5. Reconcile reply_count for this newly inserted comment
342 // In case any replies arrived out-of-order before this parent was indexed
343 reconcileQuery := `
344 UPDATE comments
345 SET reply_count = (
346 SELECT COUNT(*)
347 FROM comments c
348 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
349 )
350 WHERE id = $2
351 `
352 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
353 if reconcileErr != nil {
354 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
355 // Continue anyway - this is a best-effort reconciliation
356 }
357
358 // 2. Update parent counts atomically
359 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
360 // Parse collection from parent URI to determine target table
361 //
362 // FIXME(P1): Post comment_count reconciliation not implemented
363 // When a comment arrives before its parent post (common with cross-repo Jetstream ordering),
364 // the post update below returns 0 rows and we only log a warning. Later, when the post
365 // is indexed by the post consumer, there's NO reconciliation logic to count pre-existing
366 // comments. This causes posts to have permanently stale comment_count values.
367 //
368 // FIX REQUIRED: Post consumer MUST implement the same reconciliation pattern as comments
369 // (see lines 292-305 above). When indexing a new post, count any comments where parent_uri
370 // matches the post URI and set comment_count accordingly.
371 //
372 // Test demonstrating issue: TestCommentConsumer_PostCountReconciliation_Limitation
373 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
374
375 var updateQuery string
376 switch collection {
377 case "social.coves.community.post":
378 // Comment on post - update posts.comment_count
379 updateQuery = `
380 UPDATE posts
381 SET comment_count = comment_count + 1
382 WHERE uri = $1 AND deleted_at IS NULL
383 `
384
385 case "social.coves.community.comment":
386 // Reply to comment - update comments.reply_count
387 updateQuery = `
388 UPDATE comments
389 SET reply_count = reply_count + 1
390 WHERE uri = $1 AND deleted_at IS NULL
391 `
392
393 default:
394 // Unknown or unsupported parent collection
395 // Comment is still indexed, we just don't update parent counts
396 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
397 if commitErr := tx.Commit(); commitErr != nil {
398 return fmt.Errorf("failed to commit transaction: %w", commitErr)
399 }
400 return nil
401 }
402
403 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
404 if err != nil {
405 return fmt.Errorf("failed to update parent count: %w", err)
406 }
407
408 rowsAffected, err := result.RowsAffected()
409 if err != nil {
410 return fmt.Errorf("failed to check update result: %w", err)
411 }
412
413 // If parent not found, that's OK (parent might not be indexed yet)
414 if rowsAffected == 0 {
415 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
416 }
417
418 // Commit transaction
419 if err := tx.Commit(); err != nil {
420 return fmt.Errorf("failed to commit transaction: %w", err)
421 }
422
423 return nil
424}
425
426// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
427func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
428 tx, err := c.db.BeginTx(ctx, nil)
429 if err != nil {
430 return fmt.Errorf("failed to begin transaction: %w", err)
431 }
432 defer func() {
433 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
434 log.Printf("Failed to rollback transaction: %v", rollbackErr)
435 }
436 }()
437
438 // 1. Soft-delete the comment (idempotent)
439 deleteQuery := `
440 UPDATE comments
441 SET deleted_at = $2
442 WHERE uri = $1 AND deleted_at IS NULL
443 `
444
445 result, err := tx.ExecContext(ctx, deleteQuery, comment.URI, time.Now())
446 if err != nil {
447 return fmt.Errorf("failed to delete comment: %w", err)
448 }
449
450 rowsAffected, err := result.RowsAffected()
451 if err != nil {
452 return fmt.Errorf("failed to check delete result: %w", err)
453 }
454
455 // Idempotent: If no rows affected, comment already deleted
456 if rowsAffected == 0 {
457 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
458 if commitErr := tx.Commit(); commitErr != nil {
459 return fmt.Errorf("failed to commit transaction: %w", commitErr)
460 }
461 return nil
462 }
463
464 // 2. Decrement parent counts atomically
465 // Parent could be a post or comment - parse collection to determine target table
466 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
467
468 var updateQuery string
469 switch collection {
470 case "social.coves.community.post":
471 // Comment on post - decrement posts.comment_count
472 updateQuery = `
473 UPDATE posts
474 SET comment_count = GREATEST(0, comment_count - 1)
475 WHERE uri = $1 AND deleted_at IS NULL
476 `
477
478 case "social.coves.community.comment":
479 // Reply to comment - decrement comments.reply_count
480 updateQuery = `
481 UPDATE comments
482 SET reply_count = GREATEST(0, reply_count - 1)
483 WHERE uri = $1 AND deleted_at IS NULL
484 `
485
486 default:
487 // Unknown or unsupported parent collection
488 // Comment is still deleted, we just don't update parent counts
489 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
490 if commitErr := tx.Commit(); commitErr != nil {
491 return fmt.Errorf("failed to commit transaction: %w", commitErr)
492 }
493 return nil
494 }
495
496 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
497 if err != nil {
498 return fmt.Errorf("failed to update parent count: %w", err)
499 }
500
501 rowsAffected, err = result.RowsAffected()
502 if err != nil {
503 return fmt.Errorf("failed to check update result: %w", err)
504 }
505
506 // If parent not found, that's OK (parent might be deleted)
507 if rowsAffected == 0 {
508 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
509 }
510
511 // Commit transaction
512 if err := tx.Commit(); err != nil {
513 return fmt.Errorf("failed to commit transaction: %w", err)
514 }
515
516 return nil
517}
518
519// validateCommentEvent performs security validation on comment events
520func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
521 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
522 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
523 //
524 // We do NOT check if the user exists in AppView because:
525 // 1. Comment events may arrive before user events in Jetstream (race condition)
526 // 2. The comment came from the user's PDS repository (authenticated by PDS)
527 // 3. The database FK constraint was removed to allow out-of-order indexing
528 // 4. Orphaned comments (from never-indexed users) are harmless
529 //
530 // Security is maintained because:
531 // - Comment must come from user's own PDS repository (verified by atProto)
532 // - Fake DIDs will fail PDS authentication
533
534 // Validate DID format (basic sanity check)
535 if !strings.HasPrefix(repoDID, "did:") {
536 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
537 }
538
539 // Validate content is not empty (required per lexicon)
540 if comment.Content == "" {
541 return fmt.Errorf("comment content is required")
542 }
543
544 // Validate content length (defensive check - PDS should enforce this)
545 // Per lexicon: max 3000 graphemes, ~30000 bytes
546 // We check bytes as a simple defensive measure
547 if len(comment.Content) > MaxCommentContentBytes {
548 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
549 }
550
551 // Validate reply references exist
552 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
553 return fmt.Errorf("invalid root reference: must have both URI and CID")
554 }
555
556 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
557 return fmt.Errorf("invalid parent reference: must have both URI and CID")
558 }
559
560 // Validate AT-URI structure for root and parent
561 if err := validateATURI(comment.Reply.Root.URI); err != nil {
562 return fmt.Errorf("invalid root URI: %w", err)
563 }
564
565 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
566 return fmt.Errorf("invalid parent URI: %w", err)
567 }
568
569 return nil
570}
571
572// validateATURI performs basic structure validation on AT-URIs
573// Format: at://did:method:id/collection/rkey
574// This is defensive validation - we trust PDS but catch obviously malformed URIs
575func validateATURI(uri string) error {
576 if !strings.HasPrefix(uri, ATProtoScheme) {
577 return fmt.Errorf("must start with %s", ATProtoScheme)
578 }
579
580 // Remove at:// prefix and split by /
581 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
582 parts := strings.Split(withoutScheme, "/")
583
584 // Must have at least 3 parts: did, collection, rkey
585 if len(parts) < 3 {
586 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
587 }
588
589 // First part should be a DID
590 if !strings.HasPrefix(parts[0], "did:") {
591 return fmt.Errorf("repository identifier must be a DID")
592 }
593
594 // Collection and rkey should not be empty
595 if parts[1] == "" || parts[2] == "" {
596 return fmt.Errorf("collection and rkey cannot be empty")
597 }
598
599 return nil
600}
601
602// CommentRecordFromJetstream represents a comment record as received from Jetstream
603// Matches social.coves.community.comment lexicon
604type CommentRecordFromJetstream struct {
605 Labels interface{} `json:"labels,omitempty"`
606 Embed map[string]interface{} `json:"embed,omitempty"`
607 Reply ReplyRefFromJetstream `json:"reply"`
608 Type string `json:"$type"`
609 Content string `json:"content"`
610 CreatedAt string `json:"createdAt"`
611 Facets []interface{} `json:"facets,omitempty"`
612 Langs []string `json:"langs,omitempty"`
613}
614
615// ReplyRefFromJetstream represents the threading structure
616type ReplyRefFromJetstream struct {
617 Root StrongRefFromJetstream `json:"root"`
618 Parent StrongRefFromJetstream `json:"parent"`
619}
620
621// parseCommentRecord parses a comment record from Jetstream event data
622func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
623 // Marshal to JSON and back for proper type conversion
624 recordJSON, err := json.Marshal(record)
625 if err != nil {
626 return nil, fmt.Errorf("failed to marshal record: %w", err)
627 }
628
629 var comment CommentRecordFromJetstream
630 if err := json.Unmarshal(recordJSON, &comment); err != nil {
631 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
632 }
633
634 // Validate required fields
635 if comment.Content == "" {
636 return nil, fmt.Errorf("comment record missing content field")
637 }
638
639 if comment.CreatedAt == "" {
640 return nil, fmt.Errorf("comment record missing createdAt field")
641 }
642
643 return &comment, nil
644}
645
646// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
647// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
648func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
649 // Serialize facets if present
650 if len(commentRecord.Facets) > 0 {
651 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
652 facetsStr := string(facetsBytes)
653 facetsJSON = &facetsStr
654 }
655 }
656
657 // Serialize embed if present
658 if len(commentRecord.Embed) > 0 {
659 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
660 embedStr := string(embedBytes)
661 embedJSON = &embedStr
662 }
663 }
664
665 // Serialize labels if present
666 if commentRecord.Labels != nil {
667 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
668 labelsStr := string(labelsBytes)
669 labelsJSON = &labelsStr
670 }
671 }
672
673 return facetsJSON, embedJSON, labelsJSON
674}