A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "strings"
8
9 "Coves/internal/core/posts"
10)
11
12type postgresPostRepo struct {
13 db *sql.DB
14}
15
16// NewPostRepository creates a new PostgreSQL post repository
17func NewPostRepository(db *sql.DB) posts.Repository {
18 return &postgresPostRepo{db: db}
19}
20
21// Create inserts a new post into the posts table
22// Called by Jetstream consumer after post is created on PDS
23func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error {
24 // Serialize JSON fields for storage
25 var facetsJSON, embedJSON sql.NullString
26
27 if post.ContentFacets != nil {
28 facetsJSON.String = *post.ContentFacets
29 facetsJSON.Valid = true
30 }
31
32 if post.Embed != nil {
33 embedJSON.String = *post.Embed
34 embedJSON.Valid = true
35 }
36
37 // Store content labels as JSONB
38 // post.ContentLabels contains com.atproto.label.defs#selfLabels JSON: {"values":[{"val":"nsfw","neg":false}]}
39 // Store the full JSON blob to preserve the 'neg' field and future extensions
40 var labelsJSON sql.NullString
41 if post.ContentLabels != nil {
42 labelsJSON.String = *post.ContentLabels
43 labelsJSON.Valid = true
44 }
45
46 query := `
47 INSERT INTO posts (
48 uri, cid, rkey, author_did, community_did,
49 title, content, content_facets, embed, content_labels,
50 created_at, indexed_at
51 ) VALUES (
52 $1, $2, $3, $4, $5,
53 $6, $7, $8, $9, $10,
54 $11, NOW()
55 )
56 RETURNING id, indexed_at
57 `
58
59 err := r.db.QueryRowContext(
60 ctx, query,
61 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
62 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON,
63 post.CreatedAt,
64 ).Scan(&post.ID, &post.IndexedAt)
65 if err != nil {
66 // Check for duplicate URI (post already indexed)
67 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") {
68 return fmt.Errorf("post already indexed: %s", post.URI)
69 }
70
71 // Check for foreign key violations
72 if strings.Contains(err.Error(), "violates foreign key constraint") {
73 if strings.Contains(err.Error(), "fk_author") {
74 return fmt.Errorf("author DID not found: %s", post.AuthorDID)
75 }
76 if strings.Contains(err.Error(), "fk_community") {
77 return fmt.Errorf("community DID not found: %s", post.CommunityDID)
78 }
79 }
80
81 return fmt.Errorf("failed to insert post: %w", err)
82 }
83
84 return nil
85}
86
87// GetByURI retrieves a post by its AT-URI
88// Used for E2E test verification and future GET endpoint
89func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) {
90 query := `
91 SELECT
92 id, uri, cid, rkey, author_did, community_did,
93 title, content, content_facets, embed, content_labels,
94 created_at, edited_at, indexed_at, deleted_at,
95 upvote_count, downvote_count, score, comment_count
96 FROM posts
97 WHERE uri = $1
98 `
99
100 var post posts.Post
101 var facetsJSON, embedJSON, labelsJSON sql.NullString
102
103 err := r.db.QueryRowContext(ctx, query, uri).Scan(
104 &post.ID, &post.URI, &post.CID, &post.RKey,
105 &post.AuthorDID, &post.CommunityDID,
106 &post.Title, &post.Content, &facetsJSON, &embedJSON, &labelsJSON,
107 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt,
108 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount,
109 )
110
111 if err == sql.ErrNoRows {
112 return nil, posts.ErrNotFound
113 }
114 if err != nil {
115 return nil, fmt.Errorf("failed to get post by URI: %w", err)
116 }
117
118 // Convert SQL types back to Go types
119 if facetsJSON.Valid {
120 post.ContentFacets = &facetsJSON.String
121 }
122 if embedJSON.Valid {
123 post.Embed = &embedJSON.String
124 }
125 if labelsJSON.Valid {
126 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure
127 post.ContentLabels = &labelsJSON.String
128 }
129
130 return &post, nil
131}