A community based topic aggregation platform built on atproto
at main 24 kB view raw
1package jetstream 2 3import ( 4 "Coves/internal/atproto/utils" 5 "Coves/internal/core/comments" 6 "context" 7 "database/sql" 8 "encoding/json" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13 14 "github.com/lib/pq" 15) 16 17// Constants for comment validation and processing 18const ( 19 // CommentCollection is the lexicon collection identifier for comments 20 CommentCollection = "social.coves.community.comment" 21 22 // ATProtoScheme is the URI scheme for atProto AT-URIs 23 ATProtoScheme = "at://" 24 25 // MaxCommentContentBytes is the maximum allowed size for comment content 26 // Per lexicon: max 3000 graphemes, ~30000 bytes 27 MaxCommentContentBytes = 30000 28) 29 30// CommentEventConsumer consumes comment-related events from Jetstream 31// Handles CREATE, UPDATE, and DELETE operations for social.coves.community.comment 32type CommentEventConsumer struct { 33 commentRepo comments.Repository 34 db *sql.DB // Direct DB access for atomic count updates 35} 36 37// NewCommentEventConsumer creates a new Jetstream consumer for comment events 38func NewCommentEventConsumer( 39 commentRepo comments.Repository, 40 db *sql.DB, 41) *CommentEventConsumer { 42 return &CommentEventConsumer{ 43 commentRepo: commentRepo, 44 db: db, 45 } 46} 47 48// HandleEvent processes a Jetstream event for comment records 49func (c *CommentEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 50 // We only care about commit events for comment records 51 if event.Kind != "commit" || event.Commit == nil { 52 return nil 53 } 54 55 commit := event.Commit 56 57 // Handle comment record operations 58 if commit.Collection == CommentCollection { 59 switch commit.Operation { 60 case "create": 61 return c.createComment(ctx, event.Did, commit) 62 case "update": 63 return c.updateComment(ctx, event.Did, commit) 64 case "delete": 65 return c.deleteComment(ctx, event.Did, commit) 66 } 67 } 68 69 // Silently ignore other operations and collections 70 return nil 71} 72 73// createComment indexes a new comment from the firehose and updates parent counts 74func (c *CommentEventConsumer) createComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 75 if commit.Record == nil { 76 return fmt.Errorf("comment create event missing record data") 77 } 78 79 // Parse the comment record 80 commentRecord, err := parseCommentRecord(commit.Record) 81 if err != nil { 82 return fmt.Errorf("failed to parse comment record: %w", err) 83 } 84 85 // SECURITY: Validate this is a legitimate comment event 86 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 87 log.Printf("🚨 SECURITY: Rejecting comment event: %v", err) 88 return err 89 } 90 91 // Build AT-URI for this comment 92 // Format: at://commenter_did/social.coves.community.comment/rkey 93 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 94 95 // Parse timestamp from record 96 createdAt, err := time.Parse(time.RFC3339, commentRecord.CreatedAt) 97 if err != nil { 98 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 99 createdAt = time.Now() 100 } 101 102 // Serialize optional JSON fields 103 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 104 105 // Build comment entity 106 comment := &comments.Comment{ 107 URI: uri, 108 CID: commit.CID, 109 RKey: commit.RKey, 110 CommenterDID: repoDID, // Comment comes from user's repository 111 RootURI: commentRecord.Reply.Root.URI, 112 RootCID: commentRecord.Reply.Root.CID, 113 ParentURI: commentRecord.Reply.Parent.URI, 114 ParentCID: commentRecord.Reply.Parent.CID, 115 Content: commentRecord.Content, 116 ContentFacets: facetsJSON, 117 Embed: embedJSON, 118 ContentLabels: labelsJSON, 119 Langs: commentRecord.Langs, 120 CreatedAt: createdAt, 121 IndexedAt: time.Now(), 122 } 123 124 // Atomically: Index comment + Update parent counts 125 if err := c.indexCommentAndUpdateCounts(ctx, comment); err != nil { 126 return fmt.Errorf("failed to index comment and update counts: %w", err) 127 } 128 129 log.Printf("✓ Indexed comment: %s (on %s)", uri, comment.ParentURI) 130 return nil 131} 132 133// updateComment updates an existing comment's content fields 134func (c *CommentEventConsumer) updateComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 135 if commit.Record == nil { 136 return fmt.Errorf("comment update event missing record data") 137 } 138 139 // Parse the updated comment record 140 commentRecord, err := parseCommentRecord(commit.Record) 141 if err != nil { 142 return fmt.Errorf("failed to parse comment record: %w", err) 143 } 144 145 // SECURITY: Validate this is a legitimate update 146 if err := c.validateCommentEvent(ctx, repoDID, commentRecord); err != nil { 147 log.Printf("🚨 SECURITY: Rejecting comment update: %v", err) 148 return err 149 } 150 151 // Build AT-URI for the comment being updated 152 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 153 154 // Fetch existing comment to validate threading references are immutable 155 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 156 if err != nil { 157 if err == comments.ErrCommentNotFound { 158 // Comment doesn't exist yet - might arrive out of order 159 log.Printf("Warning: Update event for non-existent comment: %s (will be indexed on CREATE)", uri) 160 return nil 161 } 162 return fmt.Errorf("failed to get existing comment for validation: %w", err) 163 } 164 165 // SECURITY: Threading references are IMMUTABLE after creation 166 // Reject updates that attempt to change root/parent (prevents thread hijacking) 167 if existingComment.RootURI != commentRecord.Reply.Root.URI || 168 existingComment.RootCID != commentRecord.Reply.Root.CID || 169 existingComment.ParentURI != commentRecord.Reply.Parent.URI || 170 existingComment.ParentCID != commentRecord.Reply.Parent.CID { 171 log.Printf("🚨 SECURITY: Rejecting comment update - threading references are immutable: %s", uri) 172 log.Printf(" Existing root: %s (CID: %s)", existingComment.RootURI, existingComment.RootCID) 173 log.Printf(" Incoming root: %s (CID: %s)", commentRecord.Reply.Root.URI, commentRecord.Reply.Root.CID) 174 log.Printf(" Existing parent: %s (CID: %s)", existingComment.ParentURI, existingComment.ParentCID) 175 log.Printf(" Incoming parent: %s (CID: %s)", commentRecord.Reply.Parent.URI, commentRecord.Reply.Parent.CID) 176 return fmt.Errorf("comment threading references cannot be changed after creation") 177 } 178 179 // Serialize optional JSON fields 180 facetsJSON, embedJSON, labelsJSON := serializeOptionalFields(commentRecord) 181 182 // Build comment update entity (preserves vote counts and created_at) 183 comment := &comments.Comment{ 184 URI: uri, 185 CID: commit.CID, 186 Content: commentRecord.Content, 187 ContentFacets: facetsJSON, 188 Embed: embedJSON, 189 ContentLabels: labelsJSON, 190 Langs: commentRecord.Langs, 191 } 192 193 // Update the comment in repository 194 if err := c.commentRepo.Update(ctx, comment); err != nil { 195 return fmt.Errorf("failed to update comment: %w", err) 196 } 197 198 log.Printf("✓ Updated comment: %s", uri) 199 return nil 200} 201 202// deleteComment soft-deletes a comment and updates parent counts 203func (c *CommentEventConsumer) deleteComment(ctx context.Context, repoDID string, commit *CommitEvent) error { 204 // Build AT-URI for the comment being deleted 205 uri := fmt.Sprintf("at://%s/social.coves.community.comment/%s", repoDID, commit.RKey) 206 207 // Get existing comment to know its parent (for decrementing the right counter) 208 existingComment, err := c.commentRepo.GetByURI(ctx, uri) 209 if err != nil { 210 if err == comments.ErrCommentNotFound { 211 // Idempotent: Comment already deleted or never existed 212 log.Printf("Comment already deleted or not found: %s", uri) 213 return nil 214 } 215 return fmt.Errorf("failed to get existing comment: %w", err) 216 } 217 218 // Atomically: Soft-delete comment + Update parent counts 219 if err := c.deleteCommentAndUpdateCounts(ctx, existingComment); err != nil { 220 return fmt.Errorf("failed to delete comment and update counts: %w", err) 221 } 222 223 log.Printf("✓ Deleted comment: %s", uri) 224 return nil 225} 226 227// indexCommentAndUpdateCounts atomically indexes a comment and updates parent counts 228func (c *CommentEventConsumer) indexCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 229 tx, err := c.db.BeginTx(ctx, nil) 230 if err != nil { 231 return fmt.Errorf("failed to begin transaction: %w", err) 232 } 233 defer func() { 234 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 235 log.Printf("Failed to rollback transaction: %v", rollbackErr) 236 } 237 }() 238 239 // 1. Check if comment exists and handle resurrection case 240 // In atProto, deleted records' rkeys become available - users can recreate with same rkey 241 // We must distinguish: idempotent replay (skip) vs resurrection (update + restore counts) 242 var existingID int64 243 var existingDeletedAt *time.Time 244 checkQuery := `SELECT id, deleted_at FROM comments WHERE uri = $1` 245 checkErr := tx.QueryRowContext(ctx, checkQuery, comment.URI).Scan(&existingID, &existingDeletedAt) 246 247 var commentID int64 248 249 if checkErr == nil { 250 // Comment exists 251 if existingDeletedAt == nil { 252 // Not deleted - this is an idempotent replay, skip gracefully 253 log.Printf("Comment already indexed: %s (idempotent replay)", comment.URI) 254 if commitErr := tx.Commit(); commitErr != nil { 255 return fmt.Errorf("failed to commit transaction: %w", commitErr) 256 } 257 return nil 258 } 259 260 // Comment was soft-deleted, now being recreated (resurrection) 261 // This is a NEW record with same rkey - update ALL fields including threading refs 262 // User may have deleted old comment and created a new one on a different parent/root 263 // Clear deletion metadata to restore the comment 264 log.Printf("Resurrecting previously deleted comment: %s", comment.URI) 265 commentID = existingID 266 267 resurrectQuery := ` 268 UPDATE comments 269 SET 270 cid = $1, 271 commenter_did = $2, 272 root_uri = $3, 273 root_cid = $4, 274 parent_uri = $5, 275 parent_cid = $6, 276 content = $7, 277 content_facets = $8, 278 embed = $9, 279 content_labels = $10, 280 langs = $11, 281 created_at = $12, 282 indexed_at = $13, 283 deleted_at = NULL, 284 deletion_reason = NULL, 285 deleted_by = NULL, 286 reply_count = 0 287 WHERE id = $14 288 ` 289 290 _, err = tx.ExecContext( 291 ctx, resurrectQuery, 292 comment.CID, 293 comment.CommenterDID, 294 comment.RootURI, 295 comment.RootCID, 296 comment.ParentURI, 297 comment.ParentCID, 298 comment.Content, 299 comment.ContentFacets, 300 comment.Embed, 301 comment.ContentLabels, 302 pq.Array(comment.Langs), 303 comment.CreatedAt, 304 time.Now(), 305 commentID, 306 ) 307 if err != nil { 308 return fmt.Errorf("failed to resurrect comment: %w", err) 309 } 310 311 } else if checkErr == sql.ErrNoRows { 312 // Comment doesn't exist - insert new comment 313 // Use ON CONFLICT DO NOTHING to handle race conditions gracefully 314 // (e.g., duplicate Jetstream events from reconnections/retries) 315 insertQuery := ` 316 INSERT INTO comments ( 317 uri, cid, rkey, commenter_did, 318 root_uri, root_cid, parent_uri, parent_cid, 319 content, content_facets, embed, content_labels, langs, 320 created_at, indexed_at 321 ) VALUES ( 322 $1, $2, $3, $4, 323 $5, $6, $7, $8, 324 $9, $10, $11, $12, $13, 325 $14, $15 326 ) 327 ON CONFLICT (uri) DO NOTHING 328 RETURNING id 329 ` 330 331 err = tx.QueryRowContext( 332 ctx, insertQuery, 333 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 334 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 335 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 336 comment.CreatedAt, time.Now(), 337 ).Scan(&commentID) 338 if err == sql.ErrNoRows { 339 // ON CONFLICT triggered - comment was inserted by concurrent process 340 // This is an idempotent replay, skip gracefully 341 log.Printf("Comment already indexed (concurrent insert): %s", comment.URI) 342 if commitErr := tx.Commit(); commitErr != nil { 343 return fmt.Errorf("failed to commit transaction: %w", commitErr) 344 } 345 return nil 346 } 347 if err != nil { 348 return fmt.Errorf("failed to insert comment: %w", err) 349 } 350 351 } else { 352 // Unexpected error checking for existing comment 353 return fmt.Errorf("failed to check for existing comment: %w", checkErr) 354 } 355 356 // 1.5. Reconcile reply_count for this newly inserted comment 357 // In case any replies arrived out-of-order before this parent was indexed 358 reconcileQuery := ` 359 UPDATE comments 360 SET reply_count = ( 361 SELECT COUNT(*) 362 FROM comments c 363 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 364 ) 365 WHERE id = $2 366 ` 367 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, comment.URI, commentID) 368 if reconcileErr != nil { 369 log.Printf("Warning: Failed to reconcile reply_count for %s: %v", comment.URI, reconcileErr) 370 // Continue anyway - this is a best-effort reconciliation 371 } 372 373 // 2. Update parent counts atomically 374 // Parent could be a post (increment comment_count) or a comment (increment reply_count) 375 // Parse collection from parent URI to determine target table 376 // 377 // NOTE: Post comment_count reconciliation IS implemented in post_consumer.go:210-226 378 // When a comment arrives before its parent post, the post update below returns 0 rows 379 // and we log a warning. Later, when the post is indexed, the post consumer reconciles 380 // comment_count by counting all pre-existing comments. This ensures accurate counts 381 // despite out-of-order Jetstream event delivery. 382 // 383 // Test coverage: TestPostConsumer_CommentCountReconciliation in post_consumer_test.go 384 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 385 386 var updateQuery string 387 switch collection { 388 case "social.coves.community.post": 389 // Comment on post - update posts.comment_count 390 updateQuery = ` 391 UPDATE posts 392 SET comment_count = comment_count + 1 393 WHERE uri = $1 AND deleted_at IS NULL 394 ` 395 396 case "social.coves.community.comment": 397 // Reply to comment - update comments.reply_count 398 updateQuery = ` 399 UPDATE comments 400 SET reply_count = reply_count + 1 401 WHERE uri = $1 AND deleted_at IS NULL 402 ` 403 404 default: 405 // Unknown or unsupported parent collection 406 // Comment is still indexed, we just don't update parent counts 407 log.Printf("Comment parent has unsupported collection: %s (comment indexed, parent count not updated)", collection) 408 if commitErr := tx.Commit(); commitErr != nil { 409 return fmt.Errorf("failed to commit transaction: %w", commitErr) 410 } 411 return nil 412 } 413 414 result, err := tx.ExecContext(ctx, updateQuery, comment.ParentURI) 415 if err != nil { 416 return fmt.Errorf("failed to update parent count: %w", err) 417 } 418 419 rowsAffected, err := result.RowsAffected() 420 if err != nil { 421 return fmt.Errorf("failed to check update result: %w", err) 422 } 423 424 // If parent not found, that's OK (parent might not be indexed yet) 425 if rowsAffected == 0 { 426 log.Printf("Warning: Parent not found or deleted: %s (comment indexed anyway)", comment.ParentURI) 427 } 428 429 // Commit transaction 430 if err := tx.Commit(); err != nil { 431 return fmt.Errorf("failed to commit transaction: %w", err) 432 } 433 434 return nil 435} 436 437// deleteCommentAndUpdateCounts atomically soft-deletes a comment and updates parent counts 438// Blanks content to preserve thread structure while respecting user privacy 439// The comment remains in the database but is shown as "[deleted]" in thread views 440func (c *CommentEventConsumer) deleteCommentAndUpdateCounts(ctx context.Context, comment *comments.Comment) error { 441 tx, err := c.db.BeginTx(ctx, nil) 442 if err != nil { 443 return fmt.Errorf("failed to begin transaction: %w", err) 444 } 445 defer func() { 446 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 447 log.Printf("Failed to rollback transaction: %v", rollbackErr) 448 } 449 }() 450 451 // 1. Soft-delete the comment: blank content but preserve structure 452 // DELETE event from Jetstream = author deleted their own comment 453 // Content is blanked to respect user privacy while preserving thread structure 454 // Use the repository's transaction-aware method for DRY 455 repoTx, ok := c.commentRepo.(comments.RepositoryTx) 456 if !ok { 457 return fmt.Errorf("comment repository does not support transactional operations") 458 } 459 460 rowsAffected, err := repoTx.SoftDeleteWithReasonTx(ctx, tx, comment.URI, comments.DeletionReasonAuthor, comment.CommenterDID) 461 if err != nil { 462 return fmt.Errorf("failed to delete comment: %w", err) 463 } 464 465 // Idempotent: If no rows affected, comment already deleted 466 if rowsAffected == 0 { 467 log.Printf("Comment already deleted: %s (idempotent)", comment.URI) 468 if commitErr := tx.Commit(); commitErr != nil { 469 return fmt.Errorf("failed to commit transaction: %w", commitErr) 470 } 471 return nil 472 } 473 474 // 2. Decrement parent counts atomically 475 // Parent could be a post or comment - parse collection to determine target table 476 collection := utils.ExtractCollectionFromURI(comment.ParentURI) 477 478 var updateQuery string 479 var result sql.Result 480 switch collection { 481 case "social.coves.community.post": 482 // Comment on post - decrement posts.comment_count 483 updateQuery = ` 484 UPDATE posts 485 SET comment_count = GREATEST(0, comment_count - 1) 486 WHERE uri = $1 AND deleted_at IS NULL 487 ` 488 489 case "social.coves.community.comment": 490 // Reply to comment - decrement comments.reply_count 491 updateQuery = ` 492 UPDATE comments 493 SET reply_count = GREATEST(0, reply_count - 1) 494 WHERE uri = $1 AND deleted_at IS NULL 495 ` 496 497 default: 498 // Unknown or unsupported parent collection 499 // Comment is still deleted, we just don't update parent counts 500 log.Printf("Comment parent has unsupported collection: %s (comment deleted, parent count not updated)", collection) 501 if commitErr := tx.Commit(); commitErr != nil { 502 return fmt.Errorf("failed to commit transaction: %w", commitErr) 503 } 504 return nil 505 } 506 507 result, err = tx.ExecContext(ctx, updateQuery, comment.ParentURI) 508 if err != nil { 509 return fmt.Errorf("failed to update parent count: %w", err) 510 } 511 512 rowsAffected, err = result.RowsAffected() 513 if err != nil { 514 return fmt.Errorf("failed to check update result: %w", err) 515 } 516 517 // If parent not found, that's OK (parent might be deleted) 518 if rowsAffected == 0 { 519 log.Printf("Warning: Parent not found or deleted: %s (comment deleted anyway)", comment.ParentURI) 520 } 521 522 // Commit transaction 523 if err := tx.Commit(); err != nil { 524 return fmt.Errorf("failed to commit transaction: %w", err) 525 } 526 527 return nil 528} 529 530// validateCommentEvent performs security validation on comment events 531func (c *CommentEventConsumer) validateCommentEvent(ctx context.Context, repoDID string, comment *CommentRecordFromJetstream) error { 532 // SECURITY: Comments MUST come from user repositories (repo owner = commenter DID) 533 // The repository owner (repoDID) IS the commenter - comments are stored in user repos. 534 // 535 // We do NOT check if the user exists in AppView because: 536 // 1. Comment events may arrive before user events in Jetstream (race condition) 537 // 2. The comment came from the user's PDS repository (authenticated by PDS) 538 // 3. The database FK constraint was removed to allow out-of-order indexing 539 // 4. Orphaned comments (from never-indexed users) are harmless 540 // 541 // Security is maintained because: 542 // - Comment must come from user's own PDS repository (verified by atProto) 543 // - Fake DIDs will fail PDS authentication 544 545 // Validate DID format (basic sanity check) 546 if !strings.HasPrefix(repoDID, "did:") { 547 return fmt.Errorf("invalid commenter DID format: %s", repoDID) 548 } 549 550 // Validate content is not empty (required per lexicon) 551 if comment.Content == "" { 552 return fmt.Errorf("comment content is required") 553 } 554 555 // Validate content length (defensive check - PDS should enforce this) 556 // Per lexicon: max 3000 graphemes, ~30000 bytes 557 // We check bytes as a simple defensive measure 558 if len(comment.Content) > MaxCommentContentBytes { 559 return fmt.Errorf("comment content exceeds maximum length (%d bytes): got %d bytes", MaxCommentContentBytes, len(comment.Content)) 560 } 561 562 // Validate reply references exist 563 if comment.Reply.Root.URI == "" || comment.Reply.Root.CID == "" { 564 return fmt.Errorf("invalid root reference: must have both URI and CID") 565 } 566 567 if comment.Reply.Parent.URI == "" || comment.Reply.Parent.CID == "" { 568 return fmt.Errorf("invalid parent reference: must have both URI and CID") 569 } 570 571 // Validate AT-URI structure for root and parent 572 if err := validateATURI(comment.Reply.Root.URI); err != nil { 573 return fmt.Errorf("invalid root URI: %w", err) 574 } 575 576 if err := validateATURI(comment.Reply.Parent.URI); err != nil { 577 return fmt.Errorf("invalid parent URI: %w", err) 578 } 579 580 return nil 581} 582 583// validateATURI performs basic structure validation on AT-URIs 584// Format: at://did:method:id/collection/rkey 585// This is defensive validation - we trust PDS but catch obviously malformed URIs 586func validateATURI(uri string) error { 587 if !strings.HasPrefix(uri, ATProtoScheme) { 588 return fmt.Errorf("must start with %s", ATProtoScheme) 589 } 590 591 // Remove at:// prefix and split by / 592 withoutScheme := strings.TrimPrefix(uri, ATProtoScheme) 593 parts := strings.Split(withoutScheme, "/") 594 595 // Must have at least 3 parts: did, collection, rkey 596 if len(parts) < 3 { 597 return fmt.Errorf("invalid structure (expected at://did/collection/rkey)") 598 } 599 600 // First part should be a DID 601 if !strings.HasPrefix(parts[0], "did:") { 602 return fmt.Errorf("repository identifier must be a DID") 603 } 604 605 // Collection and rkey should not be empty 606 if parts[1] == "" || parts[2] == "" { 607 return fmt.Errorf("collection and rkey cannot be empty") 608 } 609 610 return nil 611} 612 613// CommentRecordFromJetstream represents a comment record as received from Jetstream 614// Matches social.coves.community.comment lexicon 615type CommentRecordFromJetstream struct { 616 Labels interface{} `json:"labels,omitempty"` 617 Embed map[string]interface{} `json:"embed,omitempty"` 618 Reply ReplyRefFromJetstream `json:"reply"` 619 Type string `json:"$type"` 620 Content string `json:"content"` 621 CreatedAt string `json:"createdAt"` 622 Facets []interface{} `json:"facets,omitempty"` 623 Langs []string `json:"langs,omitempty"` 624} 625 626// ReplyRefFromJetstream represents the threading structure 627type ReplyRefFromJetstream struct { 628 Root StrongRefFromJetstream `json:"root"` 629 Parent StrongRefFromJetstream `json:"parent"` 630} 631 632// parseCommentRecord parses a comment record from Jetstream event data 633func parseCommentRecord(record map[string]interface{}) (*CommentRecordFromJetstream, error) { 634 // Marshal to JSON and back for proper type conversion 635 recordJSON, err := json.Marshal(record) 636 if err != nil { 637 return nil, fmt.Errorf("failed to marshal record: %w", err) 638 } 639 640 var comment CommentRecordFromJetstream 641 if err := json.Unmarshal(recordJSON, &comment); err != nil { 642 return nil, fmt.Errorf("failed to unmarshal comment record: %w", err) 643 } 644 645 // Validate required fields 646 if comment.Content == "" { 647 return nil, fmt.Errorf("comment record missing content field") 648 } 649 650 if comment.CreatedAt == "" { 651 return nil, fmt.Errorf("comment record missing createdAt field") 652 } 653 654 return &comment, nil 655} 656 657// serializeOptionalFields serializes facets, embed, and labels from a comment record to JSON strings 658// Returns nil pointers for empty/nil fields (DRY helper to avoid duplication) 659func serializeOptionalFields(commentRecord *CommentRecordFromJetstream) (facetsJSON, embedJSON, labelsJSON *string) { 660 // Serialize facets if present 661 if len(commentRecord.Facets) > 0 { 662 if facetsBytes, err := json.Marshal(commentRecord.Facets); err == nil { 663 facetsStr := string(facetsBytes) 664 facetsJSON = &facetsStr 665 } 666 } 667 668 // Serialize embed if present 669 if len(commentRecord.Embed) > 0 { 670 if embedBytes, err := json.Marshal(commentRecord.Embed); err == nil { 671 embedStr := string(embedBytes) 672 embedJSON = &embedStr 673 } 674 } 675 676 // Serialize labels if present 677 if commentRecord.Labels != nil { 678 if labelsBytes, err := json.Marshal(commentRecord.Labels); err == nil { 679 labelsStr := string(labelsBytes) 680 labelsJSON = &labelsStr 681 } 682 } 683 684 return facetsJSON, embedJSON, labelsJSON 685}