A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/posts" 5 "context" 6 "database/sql" 7 "fmt" 8 "strings" 9) 10 11type postgresPostRepo struct { 12 db *sql.DB 13} 14 15// NewPostRepository creates a new PostgreSQL post repository 16func NewPostRepository(db *sql.DB) posts.Repository { 17 return &postgresPostRepo{db: db} 18} 19 20// Create inserts a new post into the posts table 21// Called by Jetstream consumer after post is created on PDS 22func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error { 23 // Serialize JSON fields for storage 24 var facetsJSON, embedJSON sql.NullString 25 26 if post.ContentFacets != nil { 27 facetsJSON.String = *post.ContentFacets 28 facetsJSON.Valid = true 29 } 30 31 if post.Embed != nil { 32 embedJSON.String = *post.Embed 33 embedJSON.Valid = true 34 } 35 36 // Store content labels as JSONB 37 // post.ContentLabels contains com.atproto.label.defs#selfLabels JSON: {"values":[{"val":"nsfw","neg":false}]} 38 // Store the full JSON blob to preserve the 'neg' field and future extensions 39 var labelsJSON sql.NullString 40 if post.ContentLabels != nil { 41 labelsJSON.String = *post.ContentLabels 42 labelsJSON.Valid = true 43 } 44 45 query := ` 46 INSERT INTO posts ( 47 uri, cid, rkey, author_did, community_did, 48 title, content, content_facets, embed, content_labels, 49 created_at, indexed_at 50 ) VALUES ( 51 $1, $2, $3, $4, $5, 52 $6, $7, $8, $9, $10, 53 $11, NOW() 54 ) 55 RETURNING id, indexed_at 56 ` 57 58 err := r.db.QueryRowContext( 59 ctx, query, 60 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 61 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON, 62 post.CreatedAt, 63 ).Scan(&post.ID, &post.IndexedAt) 64 if err != nil { 65 // Check for duplicate URI (post already indexed) 66 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") { 67 return fmt.Errorf("post already indexed: %s", post.URI) 68 } 69 70 // Check for foreign key violations 71 if strings.Contains(err.Error(), "violates foreign key constraint") { 72 if strings.Contains(err.Error(), "fk_author") { 73 return fmt.Errorf("author DID not found: %s", post.AuthorDID) 74 } 75 if strings.Contains(err.Error(), "fk_community") { 76 return fmt.Errorf("community DID not found: %s", post.CommunityDID) 77 } 78 } 79 80 return fmt.Errorf("failed to insert post: %w", err) 81 } 82 83 return nil 84} 85 86// GetByURI retrieves a post by its AT-URI 87// Used for E2E test verification and future GET endpoint 88func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) { 89 query := ` 90 SELECT 91 id, uri, cid, rkey, author_did, community_did, 92 title, content, content_facets, embed, content_labels, 93 created_at, edited_at, indexed_at, deleted_at, 94 upvote_count, downvote_count, score, comment_count 95 FROM posts 96 WHERE uri = $1 97 ` 98 99 var post posts.Post 100 var facetsJSON, embedJSON, labelsJSON sql.NullString 101 102 err := r.db.QueryRowContext(ctx, query, uri).Scan( 103 &post.ID, &post.URI, &post.CID, &post.RKey, 104 &post.AuthorDID, &post.CommunityDID, 105 &post.Title, &post.Content, &facetsJSON, &embedJSON, &labelsJSON, 106 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt, 107 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount, 108 ) 109 110 if err == sql.ErrNoRows { 111 return nil, posts.ErrNotFound 112 } 113 if err != nil { 114 return nil, fmt.Errorf("failed to get post by URI: %w", err) 115 } 116 117 // Convert SQL types back to Go types 118 if facetsJSON.Valid { 119 post.ContentFacets = &facetsJSON.String 120 } 121 if embedJSON.Valid { 122 post.Embed = &embedJSON.String 123 } 124 if labelsJSON.Valid { 125 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 126 post.ContentLabels = &labelsJSON.String 127 } 128 129 return &post, nil 130}