A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/posts"
5 "context"
6 "database/sql"
7 "fmt"
8 "strings"
9)
10
11type postgresPostRepo struct {
12 db *sql.DB
13}
14
15// NewPostRepository creates a new PostgreSQL post repository
16func NewPostRepository(db *sql.DB) posts.Repository {
17 return &postgresPostRepo{db: db}
18}
19
20// Create inserts a new post into the posts table
21// Called by Jetstream consumer after post is created on PDS
22func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error {
23 // Serialize JSON fields for storage
24 var facetsJSON, embedJSON sql.NullString
25
26 if post.ContentFacets != nil {
27 facetsJSON.String = *post.ContentFacets
28 facetsJSON.Valid = true
29 }
30
31 if post.Embed != nil {
32 embedJSON.String = *post.Embed
33 embedJSON.Valid = true
34 }
35
36 // Store content labels as JSONB
37 // post.ContentLabels contains com.atproto.label.defs#selfLabels JSON: {"values":[{"val":"nsfw","neg":false}]}
38 // Store the full JSON blob to preserve the 'neg' field and future extensions
39 var labelsJSON sql.NullString
40 if post.ContentLabels != nil {
41 labelsJSON.String = *post.ContentLabels
42 labelsJSON.Valid = true
43 }
44
45 query := `
46 INSERT INTO posts (
47 uri, cid, rkey, author_did, community_did,
48 title, content, content_facets, embed, content_labels,
49 created_at, indexed_at
50 ) VALUES (
51 $1, $2, $3, $4, $5,
52 $6, $7, $8, $9, $10,
53 $11, NOW()
54 )
55 RETURNING id, indexed_at
56 `
57
58 err := r.db.QueryRowContext(
59 ctx, query,
60 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
61 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON,
62 post.CreatedAt,
63 ).Scan(&post.ID, &post.IndexedAt)
64 if err != nil {
65 // Check for duplicate URI (post already indexed)
66 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") {
67 return fmt.Errorf("post already indexed: %s", post.URI)
68 }
69
70 // Check for foreign key violations
71 if strings.Contains(err.Error(), "violates foreign key constraint") {
72 if strings.Contains(err.Error(), "fk_author") {
73 return fmt.Errorf("author DID not found: %s", post.AuthorDID)
74 }
75 if strings.Contains(err.Error(), "fk_community") {
76 return fmt.Errorf("community DID not found: %s", post.CommunityDID)
77 }
78 }
79
80 return fmt.Errorf("failed to insert post: %w", err)
81 }
82
83 return nil
84}
85
86// GetByURI retrieves a post by its AT-URI
87// Used for E2E test verification and future GET endpoint
88func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) {
89 query := `
90 SELECT
91 id, uri, cid, rkey, author_did, community_did,
92 title, content, content_facets, embed, content_labels,
93 created_at, edited_at, indexed_at, deleted_at,
94 upvote_count, downvote_count, score, comment_count
95 FROM posts
96 WHERE uri = $1
97 `
98
99 var post posts.Post
100 var facetsJSON, embedJSON, labelsJSON sql.NullString
101
102 err := r.db.QueryRowContext(ctx, query, uri).Scan(
103 &post.ID, &post.URI, &post.CID, &post.RKey,
104 &post.AuthorDID, &post.CommunityDID,
105 &post.Title, &post.Content, &facetsJSON, &embedJSON, &labelsJSON,
106 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt,
107 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount,
108 )
109
110 if err == sql.ErrNoRows {
111 return nil, posts.ErrNotFound
112 }
113 if err != nil {
114 return nil, fmt.Errorf("failed to get post by URI: %w", err)
115 }
116
117 // Convert SQL types back to Go types
118 if facetsJSON.Valid {
119 post.ContentFacets = &facetsJSON.String
120 }
121 if embedJSON.Valid {
122 post.Embed = &embedJSON.String
123 }
124 if labelsJSON.Valid {
125 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure
126 post.ContentLabels = &labelsJSON.String
127 }
128
129 return &post, nil
130}