A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/posts" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "strings" 10 11 "github.com/lib/pq" 12) 13 14type postgresPostRepo struct { 15 db *sql.DB 16} 17 18// NewPostRepository creates a new PostgreSQL post repository 19func NewPostRepository(db *sql.DB) posts.Repository { 20 return &postgresPostRepo{db: db} 21} 22 23// Create inserts a new post into the posts table 24// Called by Jetstream consumer after post is created on PDS 25func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error { 26 // Serialize JSON fields for storage 27 var facetsJSON, embedJSON sql.NullString 28 29 if post.ContentFacets != nil { 30 facetsJSON.String = *post.ContentFacets 31 facetsJSON.Valid = true 32 } 33 34 if post.Embed != nil { 35 embedJSON.String = *post.Embed 36 embedJSON.Valid = true 37 } 38 39 // Convert content labels to PostgreSQL array 40 var labelsArray pq.StringArray 41 if post.ContentLabels != nil { 42 // Parse JSON array string to []string 43 var labels []string 44 if err := json.Unmarshal([]byte(*post.ContentLabels), &labels); err == nil { 45 labelsArray = labels 46 } 47 } 48 49 query := ` 50 INSERT INTO posts ( 51 uri, cid, rkey, author_did, community_did, 52 title, content, content_facets, embed, content_labels, 53 created_at, indexed_at 54 ) VALUES ( 55 $1, $2, $3, $4, $5, 56 $6, $7, $8, $9, $10, 57 $11, NOW() 58 ) 59 RETURNING id, indexed_at 60 ` 61 62 err := r.db.QueryRowContext( 63 ctx, query, 64 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 65 post.Title, post.Content, facetsJSON, embedJSON, labelsArray, 66 post.CreatedAt, 67 ).Scan(&post.ID, &post.IndexedAt) 68 if err != nil { 69 // Check for duplicate URI (post already indexed) 70 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") { 71 return fmt.Errorf("post already indexed: %s", post.URI) 72 } 73 74 // Check for foreign key violations 75 if strings.Contains(err.Error(), "violates foreign key constraint") { 76 if strings.Contains(err.Error(), "fk_author") { 77 return fmt.Errorf("author DID not found: %s", post.AuthorDID) 78 } 79 if strings.Contains(err.Error(), "fk_community") { 80 return fmt.Errorf("community DID not found: %s", post.CommunityDID) 81 } 82 } 83 84 return fmt.Errorf("failed to insert post: %w", err) 85 } 86 87 return nil 88} 89 90// GetByURI retrieves a post by its AT-URI 91// Used for E2E test verification and future GET endpoint 92func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) { 93 query := ` 94 SELECT 95 id, uri, cid, rkey, author_did, community_did, 96 title, content, content_facets, embed, content_labels, 97 created_at, edited_at, indexed_at, deleted_at, 98 upvote_count, downvote_count, score, comment_count 99 FROM posts 100 WHERE uri = $1 101 ` 102 103 var post posts.Post 104 var facetsJSON, embedJSON sql.NullString 105 var contentLabels pq.StringArray 106 107 err := r.db.QueryRowContext(ctx, query, uri).Scan( 108 &post.ID, &post.URI, &post.CID, &post.RKey, 109 &post.AuthorDID, &post.CommunityDID, 110 &post.Title, &post.Content, &facetsJSON, &embedJSON, &contentLabels, 111 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt, 112 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount, 113 ) 114 115 if err == sql.ErrNoRows { 116 return nil, posts.ErrNotFound 117 } 118 if err != nil { 119 return nil, fmt.Errorf("failed to get post by URI: %w", err) 120 } 121 122 // Convert SQL types back to Go types 123 if facetsJSON.Valid { 124 post.ContentFacets = &facetsJSON.String 125 } 126 if embedJSON.Valid { 127 post.Embed = &embedJSON.String 128 } 129 if len(contentLabels) > 0 { 130 labelsJSON, marshalErr := json.Marshal(contentLabels) 131 if marshalErr == nil { 132 labelsStr := string(labelsJSON) 133 post.ContentLabels = &labelsStr 134 } 135 } 136 137 return &post, nil 138}