A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/comments"
6 "context"
7 "database/sql"
8 "encoding/json"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13
14 "github.com/lib/pq"
15)
16
17// Constants for comment validation and processing
18const (
19 // CommentCollection is the lexicon collection identifier for comments
20 CommentCollection = "social.coves.community.comment"
21
22 // ATProtoScheme is the URI scheme for atProto AT-URIs
23 ATProtoScheme = "at://"
24
25 // MaxCommentContentBytes is the maximum allowed size for comment content
26 // Per lexicon: max 3000 graphemes, ~30000 bytes
27 MaxCommentContentBytes = 30000
28)
29
30// CommentEventConsumer consumes comment-related events from Jetstream
31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment
32type CommentEventConsumer struct {
33 commentRepo comments.Repository
34 db *sql.DB // Direct DB access for atomic count updates
35}
36
37// NewCommentEventConsumer creates a new Jetstream consumer for comment events
38func NewCommentEventConsumer(
39 commentRepo comments.Repository,
40 db *sql.DB,
41) *CommentEventConsumer {
42 return &CommentEventConsumer{
43 commentRepo: commentRepo,
44 db: db,
45 }
46}
47
48// HandleEvent processes a Jetstream event for comment records
49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
50 // We only care about commit events for comment records
51 if event.Kind != "commit" || event.Commit == nil {
52 return nil
53 }
54
55 commit := event.Commit
56
57 // Handle comment record operations
58 if commit.Collection == CommentCollection {
59 switch commit.Operation {
60 case "create":
61 return c.createComment(ctx, event.Did, commit)
62 case "update":
63 return c.updateComment(ctx, event.Did, commit)
64 case "delete":
65 return c.deleteComment(ctx, event.Did, commit)
66 }
67 }
68
69 // Silently ignore other operations and collections
70 return nil
71}
72
73// createComment indexes a new comment from the firehose and updates parent counts
74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
75 if commit.Record == nil {
76 return fmt.Errorf("comment create event missing record data")
77 }
78
79 // Parse the comment record
80 commentRecord, err := parseCommentRecord(commit.Record)
81 if err != nil {
82 return fmt.Errorf("failed to parse comment record: %w", err)
83 }
84
85 // SECURITY: Validate this is a legitimate comment event
86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
88 return err
89 }
90
91 // Build AT-URI for this comment
92 // Format: at://commenter_did/social.coves.community.comment/rkey
93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
94
95 // Parse timestamp from record
96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
97 if err != nil {
98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
99 createdAt = time.Now()
100 }
101
102 // Serialize optional JSON fields
103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
104
105 // Build comment entity
106 comment := &comments.Comment{
107 URI: uri,
108 CID: commit.CID,
109 RKey: commit.RKey,
110 CommenterDID: repoDID, // Comment comes from user's repository
111 RootURI: commentRecord.Reply.Root.URI,
112 RootCID: commentRecord.Reply.Root.CID,
113 ParentURI: commentRecord.Reply.Parent.URI,
114 ParentCID: commentRecord.Reply.Parent.CID,
115 Content: commentRecord.Content,
116 ContentFacets: facetsJSON,
117 Embed: embedJSON,
118 ContentLabels: labelsJSON,
119 Langs: commentRecord.Langs,
120 CreatedAt: createdAt,
121 IndexedAt: time.Now(),
122 }
123
124 // Atomically: Index comment + Update parent counts
125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
126 return fmt.Errorf("failed to index comment and update counts: %w", err)
127 }
128
129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
130 return nil
131}
132
133// updateComment updates an existing comment's content fields
134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
135 if commit.Record == nil {
136 return fmt.Errorf("comment update event missing record data")
137 }
138
139 // Parse the updated comment record
140 commentRecord, err := parseCommentRecord(commit.Record)
141 if err != nil {
142 return fmt.Errorf("failed to parse comment record: %w", err)
143 }
144
145 // SECURITY: Validate this is a legitimate update
146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
148 return err
149 }
150
151 // Build AT-URI for the comment being updated
152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
153
154 // Fetch existing comment to validate threading references are immutable
155 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
156 if err != nil {
157 if err == comments.ErrCommentNotFound {
158 // Comment doesn't exist yet - might arrive out of order
159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
160 return nil
161 }
162 return fmt.Errorf("failed to get existing comment for validation: %w", err)
163 }
164
165 // SECURITY: Threading references are IMMUTABLE after creation
166 // Reject updates that attempt to change root/parent (prevents thread hijacking)
167 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
168 existingComment.RootCID != commentRecord.Reply.Root.CID ||
169 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
170 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
176 return fmt.Errorf("comment threading references cannot be changed after creation")
177 }
178
179 // Serialize optional JSON fields
180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
181
182 // Build comment update entity (preserves vote counts and created_at)
183 comment := &comments.Comment{
184 URI: uri,
185 CID: commit.CID,
186 Content: commentRecord.Content,
187 ContentFacets: facetsJSON,
188 Embed: embedJSON,
189 ContentLabels: labelsJSON,
190 Langs: commentRecord.Langs,
191 }
192
193 // Update the comment in repository
194 if err := c.commentRepo.Update(ctx, comment); err != nil {
195 return fmt.Errorf("failed to update comment: %w", err)
196 }
197
198 log.Printf("✓ Updated comment: %s", uri)
199 return nil
200}
201
202// deleteComment soft-deletes a comment and updates parent counts
203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
204 // Build AT-URI for the comment being deleted
205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
206
207 // Get existing comment to know its parent (for decrementing the right counter)
208 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
209 if err != nil {
210 if err == comments.ErrCommentNotFound {
211 // Idempotent: Comment already deleted or never existed
212 log.Printf("Comment already deleted or not found: %s", uri)
213 return nil
214 }
215 return fmt.Errorf("failed to get existing comment: %w", err)
216 }
217
218 // Atomically: Soft-delete comment + Update parent counts
219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
220 return fmt.Errorf("failed to delete comment and update counts: %w", err)
221 }
222
223 log.Printf("✓ Deleted comment: %s", uri)
224 return nil
225}
226
227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
229 tx, err := c.db.BeginTx(ctx, nil)
230 if err != nil {
231 return fmt.Errorf("failed to begin transaction: %w", err)
232 }
233 defer func() {
234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
235 log.Printf("Failed to rollback transaction: %v", rollbackErr)
236 }
237 }()
238
239 // 1. Check if comment exists and handle resurrection case
240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
242 var existingID int64
243 var existingDeletedAt *time.Time
244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
246
247 var commentID int64
248
249 if checkErr == nil {
250 // Comment exists
251 if existingDeletedAt == nil {
252 // Not deleted - this is an idempotent replay, skip gracefully
253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
254 if commitErr := tx.Commit(); commitErr != nil {
255 return fmt.Errorf("failed to commit transaction: %w", commitErr)
256 }
257 return nil
258 }
259
260 // Comment was soft-deleted, now being recreated (resurrection)
261 // This is a NEW record with same rkey - update ALL fields including threading refs
262 // User may have deleted old comment and created a new one on a different parent/root
263 // Clear deletion metadata to restore the comment
264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
265 commentID = existingID
266
267 resurrectQuery := `
268 UPDATE comments
269 SET
270 cid = $1,
271 commenter_did = $2,
272 root_uri = $3,
273 root_cid = $4,
274 parent_uri = $5,
275 parent_cid = $6,
276 content = $7,
277 content_facets = $8,
278 embed = $9,
279 content_labels = $10,
280 langs = $11,
281 created_at = $12,
282 indexed_at = $13,
283 deleted_at = NULL,
284 deletion_reason = NULL,
285 deleted_by = NULL,
286 reply_count = 0
287 WHERE id = $14
288 `
289
290 _, err = tx.ExecContext(
291 ctx, resurrectQuery,
292 comment.CID,
293 comment.CommenterDID,
294 comment.RootURI,
295 comment.RootCID,
296 comment.ParentURI,
297 comment.ParentCID,
298 comment.Content,
299 comment.ContentFacets,
300 comment.Embed,
301 comment.ContentLabels,
302 pq.Array(comment.Langs),
303 comment.CreatedAt,
304 time.Now(),
305 commentID,
306 )
307 if err != nil {
308 return fmt.Errorf("failed to resurrect comment: %w", err)
309 }
310
311 } else if checkErr == sql.ErrNoRows {
312 // Comment doesn't exist - insert new comment
313 insertQuery := `
314 INSERT INTO comments (
315 uri, cid, rkey, commenter_did,
316 root_uri, root_cid, parent_uri, parent_cid,
317 content, content_facets, embed, content_labels, langs,
318 created_at, indexed_at
319 ) VALUES (
320 $1, $2, $3, $4,
321 $5, $6, $7, $8,
322 $9, $10, $11, $12, $13,
323 $14, $15
324 )
325 RETURNING id
326 `
327
328 err = tx.QueryRowContext(
329 ctx, insertQuery,
330 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
331 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
332 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
333 comment.CreatedAt, time.Now(),
334 ).Scan(&commentID)
335 if err != nil {
336 return fmt.Errorf("failed to insert comment: %w", err)
337 }
338
339 } else {
340 // Unexpected error checking for existing comment
341 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
342 }
343
344 // 1.5. Reconcile reply_count for this newly inserted comment
345 // In case any replies arrived out-of-order before this parent was indexed
346 reconcileQuery := `
347 UPDATE comments
348 SET reply_count = (
349 SELECT COUNT(*)
350 FROM comments c
351 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
352 )
353 WHERE id = $2
354 `
355 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
356 if reconcileErr != nil {
357 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
358 // Continue anyway - this is a best-effort reconciliation
359 }
360
361 // 2. Update parent counts atomically
362 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
363 // Parse collection from parent URI to determine target table
364 //
365 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226
366 // When a comment arrives before its parent post, the post update below returns 0 rows
367 // and we log a warning. Later, when the post is indexed, the post consumer reconciles
368 // comment_count by counting all pre-existing comments. This ensures accurate counts
369 // despite out-of-order Jetstream event delivery.
370 //
371 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go
372 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
373
374 var updateQuery string
375 switch collection {
376 case "social.coves.community.post":
377 // Comment on post - update posts.comment_count
378 updateQuery = `
379 UPDATE posts
380 SET comment_count = comment_count + 1
381 WHERE uri = $1 AND deleted_at IS NULL
382 `
383
384 case "social.coves.community.comment":
385 // Reply to comment - update comments.reply_count
386 updateQuery = `
387 UPDATE comments
388 SET reply_count = reply_count + 1
389 WHERE uri = $1 AND deleted_at IS NULL
390 `
391
392 default:
393 // Unknown or unsupported parent collection
394 // Comment is still indexed, we just don't update parent counts
395 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
396 if commitErr := tx.Commit(); commitErr != nil {
397 return fmt.Errorf("failed to commit transaction: %w", commitErr)
398 }
399 return nil
400 }
401
402 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
403 if err != nil {
404 return fmt.Errorf("failed to update parent count: %w", err)
405 }
406
407 rowsAffected, err := result.RowsAffected()
408 if err != nil {
409 return fmt.Errorf("failed to check update result: %w", err)
410 }
411
412 // If parent not found, that's OK (parent might not be indexed yet)
413 if rowsAffected == 0 {
414 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
415 }
416
417 // Commit transaction
418 if err := tx.Commit(); err != nil {
419 return fmt.Errorf("failed to commit transaction: %w", err)
420 }
421
422 return nil
423}
424
425// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
426// Blanks content to preserve thread structure while respecting user privacy
427// The comment remains in the database but is shown as "[deleted]" in thread views
428func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
429 tx, err := c.db.BeginTx(ctx, nil)
430 if err != nil {
431 return fmt.Errorf("failed to begin transaction: %w", err)
432 }
433 defer func() {
434 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
435 log.Printf("Failed to rollback transaction: %v", rollbackErr)
436 }
437 }()
438
439 // 1. Soft-delete the comment: blank content but preserve structure
440 // DELETE event from Jetstream = author deleted their own comment
441 // Content is blanked to respect user privacy while preserving thread structure
442 // Use the repository's transaction-aware method for DRY
443 repoTx, ok := c.commentRepo.(comments.RepositoryTx)
444 if !ok {
445 return fmt.Errorf("comment repository does not support transactional operations")
446 }
447
448 rowsAffected, err := repoTx.SoftDeleteWithReasonTx(ctx, tx, comment.URI, comments.DeletionReasonAuthor, comment.CommenterDID)
449 if err != nil {
450 return fmt.Errorf("failed to delete comment: %w", err)
451 }
452
453 // Idempotent: If no rows affected, comment already deleted
454 if rowsAffected == 0 {
455 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
456 if commitErr := tx.Commit(); commitErr != nil {
457 return fmt.Errorf("failed to commit transaction: %w", commitErr)
458 }
459 return nil
460 }
461
462 // 2. Decrement parent counts atomically
463 // Parent could be a post or comment - parse collection to determine target table
464 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
465
466 var updateQuery string
467 var result sql.Result
468 switch collection {
469 case "social.coves.community.post":
470 // Comment on post - decrement posts.comment_count
471 updateQuery = `
472 UPDATE posts
473 SET comment_count = GREATEST(0, comment_count - 1)
474 WHERE uri = $1 AND deleted_at IS NULL
475 `
476
477 case "social.coves.community.comment":
478 // Reply to comment - decrement comments.reply_count
479 updateQuery = `
480 UPDATE comments
481 SET reply_count = GREATEST(0, reply_count - 1)
482 WHERE uri = $1 AND deleted_at IS NULL
483 `
484
485 default:
486 // Unknown or unsupported parent collection
487 // Comment is still deleted, we just don't update parent counts
488 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
489 if commitErr := tx.Commit(); commitErr != nil {
490 return fmt.Errorf("failed to commit transaction: %w", commitErr)
491 }
492 return nil
493 }
494
495 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
496 if err != nil {
497 return fmt.Errorf("failed to update parent count: %w", err)
498 }
499
500 rowsAffected, err = result.RowsAffected()
501 if err != nil {
502 return fmt.Errorf("failed to check update result: %w", err)
503 }
504
505 // If parent not found, that's OK (parent might be deleted)
506 if rowsAffected == 0 {
507 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
508 }
509
510 // Commit transaction
511 if err := tx.Commit(); err != nil {
512 return fmt.Errorf("failed to commit transaction: %w", err)
513 }
514
515 return nil
516}
517
518// validateCommentEvent performs security validation on comment events
519func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
520 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
521 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
522 //
523 // We do NOT check if the user exists in AppView because:
524 // 1. Comment events may arrive before user events in Jetstream (race condition)
525 // 2. The comment came from the user's PDS repository (authenticated by PDS)
526 // 3. The database FK constraint was removed to allow out-of-order indexing
527 // 4. Orphaned comments (from never-indexed users) are harmless
528 //
529 // Security is maintained because:
530 // - Comment must come from user's own PDS repository (verified by atProto)
531 // - Fake DIDs will fail PDS authentication
532
533 // Validate DID format (basic sanity check)
534 if !strings.HasPrefix(repoDID, "did:") {
535 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
536 }
537
538 // Validate content is not empty (required per lexicon)
539 if comment.Content == "" {
540 return fmt.Errorf("comment content is required")
541 }
542
543 // Validate content length (defensive check - PDS should enforce this)
544 // Per lexicon: max 3000 graphemes, ~30000 bytes
545 // We check bytes as a simple defensive measure
546 if len(comment.Content) > MaxCommentContentBytes {
547 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
548 }
549
550 // Validate reply references exist
551 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
552 return fmt.Errorf("invalid root reference: must have both URI and CID")
553 }
554
555 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
556 return fmt.Errorf("invalid parent reference: must have both URI and CID")
557 }
558
559 // Validate AT-URI structure for root and parent
560 if err := validateATURI(comment.Reply.Root.URI); err != nil {
561 return fmt.Errorf("invalid root URI: %w", err)
562 }
563
564 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
565 return fmt.Errorf("invalid parent URI: %w", err)
566 }
567
568 return nil
569}
570
571// validateATURI performs basic structure validation on AT-URIs
572// Format: at://did:method:id/collection/rkey
573// This is defensive validation - we trust PDS but catch obviously malformed URIs
574func validateATURI(uri string) error {
575 if !strings.HasPrefix(uri, ATProtoScheme) {
576 return fmt.Errorf("must start with %s", ATProtoScheme)
577 }
578
579 // Remove at:// prefix and split by /
580 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
581 parts := strings.Split(withoutScheme, "/")
582
583 // Must have at least 3 parts: did, collection, rkey
584 if len(parts) < 3 {
585 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
586 }
587
588 // First part should be a DID
589 if !strings.HasPrefix(parts[0], "did:") {
590 return fmt.Errorf("repository identifier must be a DID")
591 }
592
593 // Collection and rkey should not be empty
594 if parts[1] == "" || parts[2] == "" {
595 return fmt.Errorf("collection and rkey cannot be empty")
596 }
597
598 return nil
599}
600
601// CommentRecordFromJetstream represents a comment record as received from Jetstream
602// Matches social.coves.community.comment lexicon
603type CommentRecordFromJetstream struct {
604 Labels interface{} `json:"labels,omitempty"`
605 Embed map[string]interface{} `json:"embed,omitempty"`
606 Reply ReplyRefFromJetstream `json:"reply"`
607 Type string `json:"$type"`
608 Content string `json:"content"`
609 CreatedAt string `json:"createdAt"`
610 Facets []interface{} `json:"facets,omitempty"`
611 Langs []string `json:"langs,omitempty"`
612}
613
614// ReplyRefFromJetstream represents the threading structure
615type ReplyRefFromJetstream struct {
616 Root StrongRefFromJetstream `json:"root"`
617 Parent StrongRefFromJetstream `json:"parent"`
618}
619
620// parseCommentRecord parses a comment record from Jetstream event data
621func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
622 // Marshal to JSON and back for proper type conversion
623 recordJSON, err := json.Marshal(record)
624 if err != nil {
625 return nil, fmt.Errorf("failed to marshal record: %w", err)
626 }
627
628 var comment CommentRecordFromJetstream
629 if err := json.Unmarshal(recordJSON, &comment); err != nil {
630 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
631 }
632
633 // Validate required fields
634 if comment.Content == "" {
635 return nil, fmt.Errorf("comment record missing content field")
636 }
637
638 if comment.CreatedAt == "" {
639 return nil, fmt.Errorf("comment record missing createdAt field")
640 }
641
642 return &comment, nil
643}
644
645// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
646// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
647func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
648 // Serialize facets if present
649 if len(commentRecord.Facets) > 0 {
650 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
651 facetsStr := string(facetsBytes)
652 facetsJSON = &facetsStr
653 }
654 }
655
656 // Serialize embed if present
657 if len(commentRecord.Embed) > 0 {
658 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
659 embedStr := string(embedBytes)
660 embedJSON = &embedStr
661 }
662 }
663
664 // Serialize labels if present
665 if commentRecord.Labels != nil {
666 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
667 labelsStr := string(labelsBytes)
668 labelsJSON = &labelsStr
669 }
670 }
671
672 return facetsJSON, embedJSON, labelsJSON
673}