A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/utils"
5 "Coves/internal/core/comments"
6 "context"
7 "database/sql"
8 "encoding/json"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13
14 "github.com/lib/pq"
15)
16
17// Constants for comment validation and processing
18const (
19 // CommentCollection is the lexicon collection identifier for comments
20 CommentCollection = "social.coves.community.comment"
21
22 // ATProtoScheme is the URI scheme for atProto AT-URIs
23 ATProtoScheme = "at://"
24
25 // MaxCommentContentBytes is the maximum allowed size for comment content
26 // Per lexicon: max 3000 graphemes, ~30000 bytes
27 MaxCommentContentBytes = 30000
28)
29
30// CommentEventConsumer consumes comment-related events from Jetstream
31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment
32type CommentEventConsumer struct {
33 commentRepo comments.Repository
34 db *sql.DB // Direct DB access for atomic count updates
35}
36
37// NewCommentEventConsumer creates a new Jetstream consumer for comment events
38func NewCommentEventConsumer(
39 commentRepo comments.Repository,
40 db *sql.DB,
41) *CommentEventConsumer {
42 return &CommentEventConsumer{
43 commentRepo: commentRepo,
44 db: db,
45 }
46}
47
48// HandleEvent processes a Jetstream event for comment records
49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
50 // We only care about commit events for comment records
51 if event.Kind != "commit" || event.Commit == nil {
52 return nil
53 }
54
55 commit := event.Commit
56
57 // Handle comment record operations
58 if commit.Collection == CommentCollection {
59 switch commit.Operation {
60 case "create":
61 return c.createComment(ctx, event.Did, commit)
62 case "update":
63 return c.updateComment(ctx, event.Did, commit)
64 case "delete":
65 return c.deleteComment(ctx, event.Did, commit)
66 }
67 }
68
69 // Silently ignore other operations and collections
70 return nil
71}
72
73// createComment indexes a new comment from the firehose and updates parent counts
74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
75 if commit.Record == nil {
76 return fmt.Errorf("comment create event missing record data")
77 }
78
79 // Parse the comment record
80 commentRecord, err := parseCommentRecord(commit.Record)
81 if err != nil {
82 return fmt.Errorf("failed to parse comment record: %w", err)
83 }
84
85 // SECURITY: Validate this is a legitimate comment event
86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err)
88 return err
89 }
90
91 // Build AT-URI for this comment
92 // Format: at://commenter_did/social.coves.community.comment/rkey
93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
94
95 // Parse timestamp from record
96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt)
97 if err != nil {
98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
99 createdAt = time.Now()
100 }
101
102 // Serialize optional JSON fields
103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
104
105 // Build comment entity
106 comment := &comments.Comment{
107 URI: uri,
108 CID: commit.CID,
109 RKey: commit.RKey,
110 CommenterDID: repoDID, // Comment comes from user's repository
111 RootURI: commentRecord.Reply.Root.URI,
112 RootCID: commentRecord.Reply.Root.CID,
113 ParentURI: commentRecord.Reply.Parent.URI,
114 ParentCID: commentRecord.Reply.Parent.CID,
115 Content: commentRecord.Content,
116 ContentFacets: facetsJSON,
117 Embed: embedJSON,
118 ContentLabels: labelsJSON,
119 Langs: commentRecord.Langs,
120 CreatedAt: createdAt,
121 IndexedAt: time.Now(),
122 }
123
124 // Atomically: Index comment + Update parent counts
125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil {
126 return fmt.Errorf("failed to index comment and update counts: %w", err)
127 }
128
129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI)
130 return nil
131}
132
133// updateComment updates an existing comment's content fields
134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
135 if commit.Record == nil {
136 return fmt.Errorf("comment update event missing record data")
137 }
138
139 // Parse the updated comment record
140 commentRecord, err := parseCommentRecord(commit.Record)
141 if err != nil {
142 return fmt.Errorf("failed to parse comment record: %w", err)
143 }
144
145 // SECURITY: Validate this is a legitimate update
146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil {
147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err)
148 return err
149 }
150
151 // Build AT-URI for the comment being updated
152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
153
154 // Fetch existing comment to validate threading references are immutable
155 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
156 if err != nil {
157 if err == comments.ErrCommentNotFound {
158 // Comment doesn't exist yet - might arrive out of order
159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri)
160 return nil
161 }
162 return fmt.Errorf("failed to get existing comment for validation: %w", err)
163 }
164
165 // SECURITY: Threading references are IMMUTABLE after creation
166 // Reject updates that attempt to change root/parent (prevents thread hijacking)
167 if existingComment.RootURI != commentRecord.Reply.Root.URI ||
168 existingComment.RootCID != commentRecord.Reply.Root.CID ||
169 existingComment.ParentURI != commentRecord.Reply.Parent.URI ||
170 existingComment.ParentCID != commentRecord.Reply.Parent.CID {
171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri)
172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID)
173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID)
174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID)
175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID)
176 return fmt.Errorf("comment threading references cannot be changed after creation")
177 }
178
179 // Serialize optional JSON fields
180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord)
181
182 // Build comment update entity (preserves vote counts and created_at)
183 comment := &comments.Comment{
184 URI: uri,
185 CID: commit.CID,
186 Content: commentRecord.Content,
187 ContentFacets: facetsJSON,
188 Embed: embedJSON,
189 ContentLabels: labelsJSON,
190 Langs: commentRecord.Langs,
191 }
192
193 // Update the comment in repository
194 if err := c.commentRepo.Update(ctx, comment); err != nil {
195 return fmt.Errorf("failed to update comment: %w", err)
196 }
197
198 log.Printf("✓ Updated comment: %s", uri)
199 return nil
200}
201
202// deleteComment soft-deletes a comment and updates parent counts
203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error {
204 // Build AT-URI for the comment being deleted
205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey)
206
207 // Get existing comment to know its parent (for decrementing the right counter)
208 existingComment, err := c.commentRepo.GetByURI(ctx, uri)
209 if err != nil {
210 if err == comments.ErrCommentNotFound {
211 // Idempotent: Comment already deleted or never existed
212 log.Printf("Comment already deleted or not found: %s", uri)
213 return nil
214 }
215 return fmt.Errorf("failed to get existing comment: %w", err)
216 }
217
218 // Atomically: Soft-delete comment + Update parent counts
219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil {
220 return fmt.Errorf("failed to delete comment and update counts: %w", err)
221 }
222
223 log.Printf("✓ Deleted comment: %s", uri)
224 return nil
225}
226
227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts
228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
229 tx, err := c.db.BeginTx(ctx, nil)
230 if err != nil {
231 return fmt.Errorf("failed to begin transaction: %w", err)
232 }
233 defer func() {
234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
235 log.Printf("Failed to rollback transaction: %v", rollbackErr)
236 }
237 }()
238
239 // 1. Check if comment exists and handle resurrection case
240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey
241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts)
242 var existingID int64
243 var existingDeletedAt *time.Time
244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1`
245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt)
246
247 var commentID int64
248
249 if checkErr == nil {
250 // Comment exists
251 if existingDeletedAt == nil {
252 // Not deleted - this is an idempotent replay, skip gracefully
253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI)
254 if commitErr := tx.Commit(); commitErr != nil {
255 return fmt.Errorf("failed to commit transaction: %w", commitErr)
256 }
257 return nil
258 }
259
260 // Comment was soft-deleted, now being recreated (resurrection)
261 // This is a NEW record with same rkey - update ALL fields including threading refs
262 // User may have deleted old comment and created a new one on a different parent/root
263 // Clear deletion metadata to restore the comment
264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI)
265 commentID = existingID
266
267 resurrectQuery := `
268 UPDATE comments
269 SET
270 cid = $1,
271 commenter_did = $2,
272 root_uri = $3,
273 root_cid = $4,
274 parent_uri = $5,
275 parent_cid = $6,
276 content = $7,
277 content_facets = $8,
278 embed = $9,
279 content_labels = $10,
280 langs = $11,
281 created_at = $12,
282 indexed_at = $13,
283 deleted_at = NULL,
284 deletion_reason = NULL,
285 deleted_by = NULL,
286 reply_count = 0
287 WHERE id = $14
288 `
289
290 _, err = tx.ExecContext(
291 ctx, resurrectQuery,
292 comment.CID,
293 comment.CommenterDID,
294 comment.RootURI,
295 comment.RootCID,
296 comment.ParentURI,
297 comment.ParentCID,
298 comment.Content,
299 comment.ContentFacets,
300 comment.Embed,
301 comment.ContentLabels,
302 pq.Array(comment.Langs),
303 comment.CreatedAt,
304 time.Now(),
305 commentID,
306 )
307 if err != nil {
308 return fmt.Errorf("failed to resurrect comment: %w", err)
309 }
310
311 } else if checkErr == sql.ErrNoRows {
312 // Comment doesn't exist - insert new comment
313 // Use ON CONFLICT DO NOTHING to handle race conditions gracefully
314 // (e.g., duplicate Jetstream events from reconnections/retries)
315 insertQuery := `
316 INSERT INTO comments (
317 uri, cid, rkey, commenter_did,
318 root_uri, root_cid, parent_uri, parent_cid,
319 content, content_facets, embed, content_labels, langs,
320 created_at, indexed_at
321 ) VALUES (
322 $1, $2, $3, $4,
323 $5, $6, $7, $8,
324 $9, $10, $11, $12, $13,
325 $14, $15
326 )
327 ON CONFLICT (uri) DO NOTHING
328 RETURNING id
329 `
330
331 err = tx.QueryRowContext(
332 ctx, insertQuery,
333 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
334 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
335 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
336 comment.CreatedAt, time.Now(),
337 ).Scan(&commentID)
338 if err == sql.ErrNoRows {
339 // ON CONFLICT triggered - comment was inserted by concurrent process
340 // This is an idempotent replay, skip gracefully
341 log.Printf("Comment already indexed (concurrent insert): %s", comment.URI)
342 if commitErr := tx.Commit(); commitErr != nil {
343 return fmt.Errorf("failed to commit transaction: %w", commitErr)
344 }
345 return nil
346 }
347 if err != nil {
348 return fmt.Errorf("failed to insert comment: %w", err)
349 }
350
351 } else {
352 // Unexpected error checking for existing comment
353 return fmt.Errorf("failed to check for existing comment: %w", checkErr)
354 }
355
356 // 1.5. Reconcile reply_count for this newly inserted comment
357 // In case any replies arrived out-of-order before this parent was indexed
358 reconcileQuery := `
359 UPDATE comments
360 SET reply_count = (
361 SELECT COUNT(*)
362 FROM comments c
363 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
364 )
365 WHERE id = $2
366 `
367 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID)
368 if reconcileErr != nil {
369 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr)
370 // Continue anyway - this is a best-effort reconciliation
371 }
372
373 // 2. Update parent counts atomically
374 // Parent could be a post (increment comment_count) or a comment (increment reply_count)
375 // Parse collection from parent URI to determine target table
376 //
377 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226
378 // When a comment arrives before its parent post, the post update below returns 0 rows
379 // and we log a warning. Later, when the post is indexed, the post consumer reconciles
380 // comment_count by counting all pre-existing comments. This ensures accurate counts
381 // despite out-of-order Jetstream event delivery.
382 //
383 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go
384 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
385
386 var updateQuery string
387 switch collection {
388 case "social.coves.community.post":
389 // Comment on post - update posts.comment_count
390 updateQuery = `
391 UPDATE posts
392 SET comment_count = comment_count + 1
393 WHERE uri = $1 AND deleted_at IS NULL
394 `
395
396 case "social.coves.community.comment":
397 // Reply to comment - update comments.reply_count
398 updateQuery = `
399 UPDATE comments
400 SET reply_count = reply_count + 1
401 WHERE uri = $1 AND deleted_at IS NULL
402 `
403
404 default:
405 // Unknown or unsupported parent collection
406 // Comment is still indexed, we just don't update parent counts
407 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection)
408 if commitErr := tx.Commit(); commitErr != nil {
409 return fmt.Errorf("failed to commit transaction: %w", commitErr)
410 }
411 return nil
412 }
413
414 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI)
415 if err != nil {
416 return fmt.Errorf("failed to update parent count: %w", err)
417 }
418
419 rowsAffected, err := result.RowsAffected()
420 if err != nil {
421 return fmt.Errorf("failed to check update result: %w", err)
422 }
423
424 // If parent not found, that's OK (parent might not be indexed yet)
425 if rowsAffected == 0 {
426 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI)
427 }
428
429 // Commit transaction
430 if err := tx.Commit(); err != nil {
431 return fmt.Errorf("failed to commit transaction: %w", err)
432 }
433
434 return nil
435}
436
437// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts
438// Blanks content to preserve thread structure while respecting user privacy
439// The comment remains in the database but is shown as "[deleted]" in thread views
440func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error {
441 tx, err := c.db.BeginTx(ctx, nil)
442 if err != nil {
443 return fmt.Errorf("failed to begin transaction: %w", err)
444 }
445 defer func() {
446 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
447 log.Printf("Failed to rollback transaction: %v", rollbackErr)
448 }
449 }()
450
451 // 1. Soft-delete the comment: blank content but preserve structure
452 // DELETE event from Jetstream = author deleted their own comment
453 // Content is blanked to respect user privacy while preserving thread structure
454 // Use the repository's transaction-aware method for DRY
455 repoTx, ok := c.commentRepo.(comments.RepositoryTx)
456 if !ok {
457 return fmt.Errorf("comment repository does not support transactional operations")
458 }
459
460 rowsAffected, err := repoTx.SoftDeleteWithReasonTx(ctx, tx, comment.URI, comments.DeletionReasonAuthor, comment.CommenterDID)
461 if err != nil {
462 return fmt.Errorf("failed to delete comment: %w", err)
463 }
464
465 // Idempotent: If no rows affected, comment already deleted
466 if rowsAffected == 0 {
467 log.Printf("Comment already deleted: %s (idempotent)", comment.URI)
468 if commitErr := tx.Commit(); commitErr != nil {
469 return fmt.Errorf("failed to commit transaction: %w", commitErr)
470 }
471 return nil
472 }
473
474 // 2. Decrement parent counts atomically
475 // Parent could be a post or comment - parse collection to determine target table
476 collection := utils.ExtractCollectionFromURI(comment.ParentURI)
477
478 var updateQuery string
479 var result sql.Result
480 switch collection {
481 case "social.coves.community.post":
482 // Comment on post - decrement posts.comment_count
483 updateQuery = `
484 UPDATE posts
485 SET comment_count = GREATEST(0, comment_count - 1)
486 WHERE uri = $1 AND deleted_at IS NULL
487 `
488
489 case "social.coves.community.comment":
490 // Reply to comment - decrement comments.reply_count
491 updateQuery = `
492 UPDATE comments
493 SET reply_count = GREATEST(0, reply_count - 1)
494 WHERE uri = $1 AND deleted_at IS NULL
495 `
496
497 default:
498 // Unknown or unsupported parent collection
499 // Comment is still deleted, we just don't update parent counts
500 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection)
501 if commitErr := tx.Commit(); commitErr != nil {
502 return fmt.Errorf("failed to commit transaction: %w", commitErr)
503 }
504 return nil
505 }
506
507 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI)
508 if err != nil {
509 return fmt.Errorf("failed to update parent count: %w", err)
510 }
511
512 rowsAffected, err = result.RowsAffected()
513 if err != nil {
514 return fmt.Errorf("failed to check update result: %w", err)
515 }
516
517 // If parent not found, that's OK (parent might be deleted)
518 if rowsAffected == 0 {
519 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI)
520 }
521
522 // Commit transaction
523 if err := tx.Commit(); err != nil {
524 return fmt.Errorf("failed to commit transaction: %w", err)
525 }
526
527 return nil
528}
529
530// validateCommentEvent performs security validation on comment events
531func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error {
532 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID)
533 // The repository owner (repoDID) IS the commenter - comments are stored in user repos.
534 //
535 // We do NOT check if the user exists in AppView because:
536 // 1. Comment events may arrive before user events in Jetstream (race condition)
537 // 2. The comment came from the user's PDS repository (authenticated by PDS)
538 // 3. The database FK constraint was removed to allow out-of-order indexing
539 // 4. Orphaned comments (from never-indexed users) are harmless
540 //
541 // Security is maintained because:
542 // - Comment must come from user's own PDS repository (verified by atProto)
543 // - Fake DIDs will fail PDS authentication
544
545 // Validate DID format (basic sanity check)
546 if !strings.HasPrefix(repoDID, "did:") {
547 return fmt.Errorf("invalid commenter DID format: %s", repoDID)
548 }
549
550 // Validate content is not empty (required per lexicon)
551 if comment.Content == "" {
552 return fmt.Errorf("comment content is required")
553 }
554
555 // Validate content length (defensive check - PDS should enforce this)
556 // Per lexicon: max 3000 graphemes, ~30000 bytes
557 // We check bytes as a simple defensive measure
558 if len(comment.Content) > MaxCommentContentBytes {
559 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content))
560 }
561
562 // Validate reply references exist
563 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" {
564 return fmt.Errorf("invalid root reference: must have both URI and CID")
565 }
566
567 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" {
568 return fmt.Errorf("invalid parent reference: must have both URI and CID")
569 }
570
571 // Validate AT-URI structure for root and parent
572 if err := validateATURI(comment.Reply.Root.URI); err != nil {
573 return fmt.Errorf("invalid root URI: %w", err)
574 }
575
576 if err := validateATURI(comment.Reply.Parent.URI); err != nil {
577 return fmt.Errorf("invalid parent URI: %w", err)
578 }
579
580 return nil
581}
582
583// validateATURI performs basic structure validation on AT-URIs
584// Format: at://did:method:id/collection/rkey
585// This is defensive validation - we trust PDS but catch obviously malformed URIs
586func validateATURI(uri string) error {
587 if !strings.HasPrefix(uri, ATProtoScheme) {
588 return fmt.Errorf("must start with %s", ATProtoScheme)
589 }
590
591 // Remove at:// prefix and split by /
592 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme)
593 parts := strings.Split(withoutScheme, "/")
594
595 // Must have at least 3 parts: did, collection, rkey
596 if len(parts) < 3 {
597 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)")
598 }
599
600 // First part should be a DID
601 if !strings.HasPrefix(parts[0], "did:") {
602 return fmt.Errorf("repository identifier must be a DID")
603 }
604
605 // Collection and rkey should not be empty
606 if parts[1] == "" || parts[2] == "" {
607 return fmt.Errorf("collection and rkey cannot be empty")
608 }
609
610 return nil
611}
612
613// CommentRecordFromJetstream represents a comment record as received from Jetstream
614// Matches social.coves.community.comment lexicon
615type CommentRecordFromJetstream struct {
616 Labels interface{} `json:"labels,omitempty"`
617 Embed map[string]interface{} `json:"embed,omitempty"`
618 Reply ReplyRefFromJetstream `json:"reply"`
619 Type string `json:"$type"`
620 Content string `json:"content"`
621 CreatedAt string `json:"createdAt"`
622 Facets []interface{} `json:"facets,omitempty"`
623 Langs []string `json:"langs,omitempty"`
624}
625
626// ReplyRefFromJetstream represents the threading structure
627type ReplyRefFromJetstream struct {
628 Root StrongRefFromJetstream `json:"root"`
629 Parent StrongRefFromJetstream `json:"parent"`
630}
631
632// parseCommentRecord parses a comment record from Jetstream event data
633func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) {
634 // Marshal to JSON and back for proper type conversion
635 recordJSON, err := json.Marshal(record)
636 if err != nil {
637 return nil, fmt.Errorf("failed to marshal record: %w", err)
638 }
639
640 var comment CommentRecordFromJetstream
641 if err := json.Unmarshal(recordJSON, &comment); err != nil {
642 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err)
643 }
644
645 // Validate required fields
646 if comment.Content == "" {
647 return nil, fmt.Errorf("comment record missing content field")
648 }
649
650 if comment.CreatedAt == "" {
651 return nil, fmt.Errorf("comment record missing createdAt field")
652 }
653
654 return &comment, nil
655}
656
657// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings
658// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication)
659func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) {
660 // Serialize facets if present
661 if len(commentRecord.Facets) > 0 {
662 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil {
663 facetsStr := string(facetsBytes)
664 facetsJSON = &facetsStr
665 }
666 }
667
668 // Serialize embed if present
669 if len(commentRecord.Embed) > 0 {
670 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil {
671 embedStr := string(embedBytes)
672 embedJSON = &embedStr
673 }
674 }
675
676 // Serialize labels if present
677 if commentRecord.Labels != nil {
678 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil {
679 labelsStr := string(labelsBytes)
680 labelsJSON = &labelsStr
681 }
682 }
683
684 return facetsJSON, embedJSON, labelsJSON
685}