A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/comments"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10
11 "github.com/lib/pq"
12)
13
14type postgresCommentRepo struct {
15 db *sql.DB
16}
17
18// NewCommentRepository creates a new PostgreSQL comment repository
19func NewCommentRepository(db *sql.DB) comments.Repository {
20 return &postgresCommentRepo{db: db}
21}
22
23// Create inserts a new comment into the comments table
24// Called by Jetstream consumer after comment is created on PDS
25// Idempotent: Returns success if comment already exists (for Jetstream replays)
26func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error {
27 query := `
28 INSERT INTO comments (
29 uri, cid, rkey, commenter_did,
30 root_uri, root_cid, parent_uri, parent_cid,
31 content, content_facets, embed, content_labels, langs,
32 created_at, indexed_at
33 ) VALUES (
34 $1, $2, $3, $4,
35 $5, $6, $7, $8,
36 $9, $10, $11, $12, $13,
37 $14, NOW()
38 )
39 ON CONFLICT (uri) DO NOTHING
40 RETURNING id, indexed_at
41 `
42
43 err := r.db.QueryRowContext(
44 ctx, query,
45 comment.URI, comment.CID, comment.RKey, comment.CommenterDID,
46 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID,
47 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs),
48 comment.CreatedAt,
49 ).Scan(&comment.ID, &comment.IndexedAt)
50
51 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent)
52 if err == sql.ErrNoRows {
53 return nil // Comment already exists, no error for idempotency
54 }
55
56 if err != nil {
57 // Check for unique constraint violation
58 if strings.Contains(err.Error(), "duplicate key") {
59 return comments.ErrCommentAlreadyExists
60 }
61
62 return fmt.Errorf("failed to insert comment: %w", err)
63 }
64
65 return nil
66}
67
68// Update modifies an existing comment's content fields
69// Called by Jetstream consumer after comment is updated on PDS
70// Preserves vote counts and created_at timestamp
71func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error {
72 query := `
73 UPDATE comments
74 SET
75 cid = $1,
76 content = $2,
77 content_facets = $3,
78 embed = $4,
79 content_labels = $5,
80 langs = $6
81 WHERE uri = $7 AND deleted_at IS NULL
82 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count
83 `
84
85 err := r.db.QueryRowContext(
86 ctx, query,
87 comment.CID,
88 comment.Content,
89 comment.ContentFacets,
90 comment.Embed,
91 comment.ContentLabels,
92 pq.Array(comment.Langs),
93 comment.URI,
94 ).Scan(
95 &comment.ID,
96 &comment.IndexedAt,
97 &comment.CreatedAt,
98 &comment.UpvoteCount,
99 &comment.DownvoteCount,
100 &comment.Score,
101 &comment.ReplyCount,
102 )
103
104 if err == sql.ErrNoRows {
105 return comments.ErrCommentNotFound
106 }
107 if err != nil {
108 return fmt.Errorf("failed to update comment: %w", err)
109 }
110
111 return nil
112}
113
114// GetByURI retrieves a comment by its AT-URI
115// Used by Jetstream consumer for UPDATE/DELETE operations
116func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) {
117 query := `
118 SELECT
119 id, uri, cid, rkey, commenter_did,
120 root_uri, root_cid, parent_uri, parent_cid,
121 content, content_facets, embed, content_labels, langs,
122 created_at, indexed_at, deleted_at,
123 upvote_count, downvote_count, score, reply_count
124 FROM comments
125 WHERE uri = $1
126 `
127
128 var comment comments.Comment
129 var langs pq.StringArray
130
131 err := r.db.QueryRowContext(ctx, query, uri).Scan(
132 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
133 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
134 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
135 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
136 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
137 )
138
139 if err == sql.ErrNoRows {
140 return nil, comments.ErrCommentNotFound
141 }
142 if err != nil {
143 return nil, fmt.Errorf("failed to get comment by URI: %w", err)
144 }
145
146 comment.Langs = langs
147
148 return &comment, nil
149}
150
151// Delete soft-deletes a comment (sets deleted_at)
152// Called by Jetstream consumer after comment is deleted from PDS
153// Idempotent: Returns success if comment already deleted
154func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error {
155 query := `
156 UPDATE comments
157 SET deleted_at = NOW()
158 WHERE uri = $1 AND deleted_at IS NULL
159 `
160
161 result, err := r.db.ExecContext(ctx, query, uri)
162 if err != nil {
163 return fmt.Errorf("failed to delete comment: %w", err)
164 }
165
166 rowsAffected, err := result.RowsAffected()
167 if err != nil {
168 return fmt.Errorf("failed to check delete result: %w", err)
169 }
170
171 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays)
172 if rowsAffected == 0 {
173 return nil
174 }
175
176 return nil
177}
178
179// ListByRoot retrieves all active comments in a thread (flat)
180// Used for fetching entire comment threads on posts
181func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) {
182 query := `
183 SELECT
184 id, uri, cid, rkey, commenter_did,
185 root_uri, root_cid, parent_uri, parent_cid,
186 content, content_facets, embed, content_labels, langs,
187 created_at, indexed_at, deleted_at,
188 upvote_count, downvote_count, score, reply_count
189 FROM comments
190 WHERE root_uri = $1 AND deleted_at IS NULL
191 ORDER BY created_at ASC
192 LIMIT $2 OFFSET $3
193 `
194
195 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset)
196 if err != nil {
197 return nil, fmt.Errorf("failed to list comments by root: %w", err)
198 }
199 defer func() {
200 if err := rows.Close(); err != nil {
201 log.Printf("Failed to close rows: %v", err)
202 }
203 }()
204
205 var result []*comments.Comment
206 for rows.Next() {
207 var comment comments.Comment
208 var langs pq.StringArray
209
210 err := rows.Scan(
211 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
212 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
213 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
214 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
215 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
216 )
217 if err != nil {
218 return nil, fmt.Errorf("failed to scan comment: %w", err)
219 }
220
221 comment.Langs = langs
222 result = append(result, &comment)
223 }
224
225 if err = rows.Err(); err != nil {
226 return nil, fmt.Errorf("error iterating comments: %w", err)
227 }
228
229 return result, nil
230}
231
232// ListByParent retrieves direct replies to a post or comment
233// Used for building nested/threaded comment views
234func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) {
235 query := `
236 SELECT
237 id, uri, cid, rkey, commenter_did,
238 root_uri, root_cid, parent_uri, parent_cid,
239 content, content_facets, embed, content_labels, langs,
240 created_at, indexed_at, deleted_at,
241 upvote_count, downvote_count, score, reply_count
242 FROM comments
243 WHERE parent_uri = $1 AND deleted_at IS NULL
244 ORDER BY created_at ASC
245 LIMIT $2 OFFSET $3
246 `
247
248 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset)
249 if err != nil {
250 return nil, fmt.Errorf("failed to list comments by parent: %w", err)
251 }
252 defer func() {
253 if err := rows.Close(); err != nil {
254 log.Printf("Failed to close rows: %v", err)
255 }
256 }()
257
258 var result []*comments.Comment
259 for rows.Next() {
260 var comment comments.Comment
261 var langs pq.StringArray
262
263 err := rows.Scan(
264 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
265 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
266 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
267 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
268 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
269 )
270 if err != nil {
271 return nil, fmt.Errorf("failed to scan comment: %w", err)
272 }
273
274 comment.Langs = langs
275 result = append(result, &comment)
276 }
277
278 if err = rows.Err(); err != nil {
279 return nil, fmt.Errorf("error iterating comments: %w", err)
280 }
281
282 return result, nil
283}
284
285// CountByParent counts direct replies to a post or comment
286// Used for showing reply counts in threading UI
287func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) {
288 query := `
289 SELECT COUNT(*)
290 FROM comments
291 WHERE parent_uri = $1 AND deleted_at IS NULL
292 `
293
294 var count int
295 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count)
296 if err != nil {
297 return 0, fmt.Errorf("failed to count comments by parent: %w", err)
298 }
299
300 return count, nil
301}
302
303// ListByCommenter retrieves all active comments by a specific user
304// Future: Used for user comment history
305func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) {
306 query := `
307 SELECT
308 id, uri, cid, rkey, commenter_did,
309 root_uri, root_cid, parent_uri, parent_cid,
310 content, content_facets, embed, content_labels, langs,
311 created_at, indexed_at, deleted_at,
312 upvote_count, downvote_count, score, reply_count
313 FROM comments
314 WHERE commenter_did = $1 AND deleted_at IS NULL
315 ORDER BY created_at DESC
316 LIMIT $2 OFFSET $3
317 `
318
319 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset)
320 if err != nil {
321 return nil, fmt.Errorf("failed to list comments by commenter: %w", err)
322 }
323 defer func() {
324 if err := rows.Close(); err != nil {
325 log.Printf("Failed to close rows: %v", err)
326 }
327 }()
328
329 var result []*comments.Comment
330 for rows.Next() {
331 var comment comments.Comment
332 var langs pq.StringArray
333
334 err := rows.Scan(
335 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID,
336 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID,
337 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs,
338 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt,
339 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount,
340 )
341 if err != nil {
342 return nil, fmt.Errorf("failed to scan comment: %w", err)
343 }
344
345 comment.Langs = langs
346 result = append(result, &comment)
347 }
348
349 if err = rows.Err(); err != nil {
350 return nil, fmt.Errorf("error iterating comments: %w", err)
351 }
352
353 return result, nil
354}