A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/posts"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "strings"
10
11 "github.com/lib/pq"
12)
13
14type postgresPostRepo struct {
15 db *sql.DB
16}
17
18// NewPostRepository creates a new PostgreSQL post repository
19func NewPostRepository(db *sql.DB) posts.Repository {
20 return &postgresPostRepo{db: db}
21}
22
23// Create inserts a new post into the posts table
24// Called by Jetstream consumer after post is created on PDS
25func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error {
26 // Serialize JSON fields for storage
27 var facetsJSON, embedJSON sql.NullString
28
29 if post.ContentFacets != nil {
30 facetsJSON.String = *post.ContentFacets
31 facetsJSON.Valid = true
32 }
33
34 if post.Embed != nil {
35 embedJSON.String = *post.Embed
36 embedJSON.Valid = true
37 }
38
39 // Convert content labels to PostgreSQL array
40 var labelsArray pq.StringArray
41 if post.ContentLabels != nil {
42 // Parse JSON array string to []string
43 var labels []string
44 if err := json.Unmarshal([]byte(*post.ContentLabels), &labels); err == nil {
45 labelsArray = labels
46 }
47 }
48
49 query := `
50 INSERT INTO posts (
51 uri, cid, rkey, author_did, community_did,
52 title, content, content_facets, embed, content_labels,
53 created_at, indexed_at
54 ) VALUES (
55 $1, $2, $3, $4, $5,
56 $6, $7, $8, $9, $10,
57 $11, NOW()
58 )
59 RETURNING id, indexed_at
60 `
61
62 err := r.db.QueryRowContext(
63 ctx, query,
64 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
65 post.Title, post.Content, facetsJSON, embedJSON, labelsArray,
66 post.CreatedAt,
67 ).Scan(&post.ID, &post.IndexedAt)
68 if err != nil {
69 // Check for duplicate URI (post already indexed)
70 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") {
71 return fmt.Errorf("post already indexed: %s", post.URI)
72 }
73
74 // Check for foreign key violations
75 if strings.Contains(err.Error(), "violates foreign key constraint") {
76 if strings.Contains(err.Error(), "fk_author") {
77 return fmt.Errorf("author DID not found: %s", post.AuthorDID)
78 }
79 if strings.Contains(err.Error(), "fk_community") {
80 return fmt.Errorf("community DID not found: %s", post.CommunityDID)
81 }
82 }
83
84 return fmt.Errorf("failed to insert post: %w", err)
85 }
86
87 return nil
88}
89
90// GetByURI retrieves a post by its AT-URI
91// Used for E2E test verification and future GET endpoint
92func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) {
93 query := `
94 SELECT
95 id, uri, cid, rkey, author_did, community_did,
96 title, content, content_facets, embed, content_labels,
97 created_at, edited_at, indexed_at, deleted_at,
98 upvote_count, downvote_count, score, comment_count
99 FROM posts
100 WHERE uri = $1
101 `
102
103 var post posts.Post
104 var facetsJSON, embedJSON sql.NullString
105 var contentLabels pq.StringArray
106
107 err := r.db.QueryRowContext(ctx, query, uri).Scan(
108 &post.ID, &post.URI, &post.CID, &post.RKey,
109 &post.AuthorDID, &post.CommunityDID,
110 &post.Title, &post.Content, &facetsJSON, &embedJSON, &contentLabels,
111 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt,
112 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount,
113 )
114
115 if err == sql.ErrNoRows {
116 return nil, posts.ErrNotFound
117 }
118 if err != nil {
119 return nil, fmt.Errorf("failed to get post by URI: %w", err)
120 }
121
122 // Convert SQL types back to Go types
123 if facetsJSON.Valid {
124 post.ContentFacets = &facetsJSON.String
125 }
126 if embedJSON.Valid {
127 post.Embed = &embedJSON.String
128 }
129 if len(contentLabels) > 0 {
130 labelsJSON, marshalErr := json.Marshal(contentLabels)
131 if marshalErr == nil {
132 labelsStr := string(labelsJSON)
133 post.ContentLabels = &labelsStr
134 }
135 }
136
137 return &post, nil
138}