A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/comments" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10 11 "github.com/lib/pq" 12) 13 14type postgresCommentRepo struct { 15 db *sql.DB 16} 17 18// NewCommentRepository creates a new PostgreSQL comment repository 19func NewCommentRepository(db *sql.DB) comments.Repository { 20 return &postgresCommentRepo{db: db} 21} 22 23// Create inserts a new comment into the comments table 24// Called by Jetstream consumer after comment is created on PDS 25// Idempotent: Returns success if comment already exists (for Jetstream replays) 26func (r *postgresCommentRepo) Create(ctx context.Context, comment *comments.Comment) error { 27 query := ` 28 INSERT INTO comments ( 29 uri, cid, rkey, commenter_did, 30 root_uri, root_cid, parent_uri, parent_cid, 31 content, content_facets, embed, content_labels, langs, 32 created_at, indexed_at 33 ) VALUES ( 34 $1, $2, $3, $4, 35 $5, $6, $7, $8, 36 $9, $10, $11, $12, $13, 37 $14, NOW() 38 ) 39 ON CONFLICT (uri) DO NOTHING 40 RETURNING id, indexed_at 41 ` 42 43 err := r.db.QueryRowContext( 44 ctx, query, 45 comment.URI, comment.CID, comment.RKey, comment.CommenterDID, 46 comment.RootURI, comment.RootCID, comment.ParentURI, comment.ParentCID, 47 comment.Content, comment.ContentFacets, comment.Embed, comment.ContentLabels, pq.Array(comment.Langs), 48 comment.CreatedAt, 49 ).Scan(&comment.ID, &comment.IndexedAt) 50 51 // ON CONFLICT DO NOTHING returns no rows if duplicate - this is OK (idempotent) 52 if err == sql.ErrNoRows { 53 return nil // Comment already exists, no error for idempotency 54 } 55 56 if err != nil { 57 // Check for unique constraint violation 58 if strings.Contains(err.Error(), "duplicate key") { 59 return comments.ErrCommentAlreadyExists 60 } 61 62 return fmt.Errorf("failed to insert comment: %w", err) 63 } 64 65 return nil 66} 67 68// Update modifies an existing comment's content fields 69// Called by Jetstream consumer after comment is updated on PDS 70// Preserves vote counts and created_at timestamp 71func (r *postgresCommentRepo) Update(ctx context.Context, comment *comments.Comment) error { 72 query := ` 73 UPDATE comments 74 SET 75 cid = $1, 76 content = $2, 77 content_facets = $3, 78 embed = $4, 79 content_labels = $5, 80 langs = $6 81 WHERE uri = $7 AND deleted_at IS NULL 82 RETURNING id, indexed_at, created_at, upvote_count, downvote_count, score, reply_count 83 ` 84 85 err := r.db.QueryRowContext( 86 ctx, query, 87 comment.CID, 88 comment.Content, 89 comment.ContentFacets, 90 comment.Embed, 91 comment.ContentLabels, 92 pq.Array(comment.Langs), 93 comment.URI, 94 ).Scan( 95 &comment.ID, 96 &comment.IndexedAt, 97 &comment.CreatedAt, 98 &comment.UpvoteCount, 99 &comment.DownvoteCount, 100 &comment.Score, 101 &comment.ReplyCount, 102 ) 103 104 if err == sql.ErrNoRows { 105 return comments.ErrCommentNotFound 106 } 107 if err != nil { 108 return fmt.Errorf("failed to update comment: %w", err) 109 } 110 111 return nil 112} 113 114// GetByURI retrieves a comment by its AT-URI 115// Used by Jetstream consumer for UPDATE/DELETE operations 116func (r *postgresCommentRepo) GetByURI(ctx context.Context, uri string) (*comments.Comment, error) { 117 query := ` 118 SELECT 119 id, uri, cid, rkey, commenter_did, 120 root_uri, root_cid, parent_uri, parent_cid, 121 content, content_facets, embed, content_labels, langs, 122 created_at, indexed_at, deleted_at, 123 upvote_count, downvote_count, score, reply_count 124 FROM comments 125 WHERE uri = $1 126 ` 127 128 var comment comments.Comment 129 var langs pq.StringArray 130 131 err := r.db.QueryRowContext(ctx, query, uri).Scan( 132 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 133 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 134 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 135 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 136 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 137 ) 138 139 if err == sql.ErrNoRows { 140 return nil, comments.ErrCommentNotFound 141 } 142 if err != nil { 143 return nil, fmt.Errorf("failed to get comment by URI: %w", err) 144 } 145 146 comment.Langs = langs 147 148 return &comment, nil 149} 150 151// Delete soft-deletes a comment (sets deleted_at) 152// Called by Jetstream consumer after comment is deleted from PDS 153// Idempotent: Returns success if comment already deleted 154func (r *postgresCommentRepo) Delete(ctx context.Context, uri string) error { 155 query := ` 156 UPDATE comments 157 SET deleted_at = NOW() 158 WHERE uri = $1 AND deleted_at IS NULL 159 ` 160 161 result, err := r.db.ExecContext(ctx, query, uri) 162 if err != nil { 163 return fmt.Errorf("failed to delete comment: %w", err) 164 } 165 166 rowsAffected, err := result.RowsAffected() 167 if err != nil { 168 return fmt.Errorf("failed to check delete result: %w", err) 169 } 170 171 // Idempotent: If no rows affected, comment already deleted (OK for Jetstream replays) 172 if rowsAffected == 0 { 173 return nil 174 } 175 176 return nil 177} 178 179// ListByRoot retrieves all active comments in a thread (flat) 180// Used for fetching entire comment threads on posts 181func (r *postgresCommentRepo) ListByRoot(ctx context.Context, rootURI string, limit, offset int) ([]*comments.Comment, error) { 182 query := ` 183 SELECT 184 id, uri, cid, rkey, commenter_did, 185 root_uri, root_cid, parent_uri, parent_cid, 186 content, content_facets, embed, content_labels, langs, 187 created_at, indexed_at, deleted_at, 188 upvote_count, downvote_count, score, reply_count 189 FROM comments 190 WHERE root_uri = $1 AND deleted_at IS NULL 191 ORDER BY created_at ASC 192 LIMIT $2 OFFSET $3 193 ` 194 195 rows, err := r.db.QueryContext(ctx, query, rootURI, limit, offset) 196 if err != nil { 197 return nil, fmt.Errorf("failed to list comments by root: %w", err) 198 } 199 defer func() { 200 if err := rows.Close(); err != nil { 201 log.Printf("Failed to close rows: %v", err) 202 } 203 }() 204 205 var result []*comments.Comment 206 for rows.Next() { 207 var comment comments.Comment 208 var langs pq.StringArray 209 210 err := rows.Scan( 211 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 212 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 213 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 214 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 215 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 216 ) 217 if err != nil { 218 return nil, fmt.Errorf("failed to scan comment: %w", err) 219 } 220 221 comment.Langs = langs 222 result = append(result, &comment) 223 } 224 225 if err = rows.Err(); err != nil { 226 return nil, fmt.Errorf("error iterating comments: %w", err) 227 } 228 229 return result, nil 230} 231 232// ListByParent retrieves direct replies to a post or comment 233// Used for building nested/threaded comment views 234func (r *postgresCommentRepo) ListByParent(ctx context.Context, parentURI string, limit, offset int) ([]*comments.Comment, error) { 235 query := ` 236 SELECT 237 id, uri, cid, rkey, commenter_did, 238 root_uri, root_cid, parent_uri, parent_cid, 239 content, content_facets, embed, content_labels, langs, 240 created_at, indexed_at, deleted_at, 241 upvote_count, downvote_count, score, reply_count 242 FROM comments 243 WHERE parent_uri = $1 AND deleted_at IS NULL 244 ORDER BY created_at ASC 245 LIMIT $2 OFFSET $3 246 ` 247 248 rows, err := r.db.QueryContext(ctx, query, parentURI, limit, offset) 249 if err != nil { 250 return nil, fmt.Errorf("failed to list comments by parent: %w", err) 251 } 252 defer func() { 253 if err := rows.Close(); err != nil { 254 log.Printf("Failed to close rows: %v", err) 255 } 256 }() 257 258 var result []*comments.Comment 259 for rows.Next() { 260 var comment comments.Comment 261 var langs pq.StringArray 262 263 err := rows.Scan( 264 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 265 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 266 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 267 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 268 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 269 ) 270 if err != nil { 271 return nil, fmt.Errorf("failed to scan comment: %w", err) 272 } 273 274 comment.Langs = langs 275 result = append(result, &comment) 276 } 277 278 if err = rows.Err(); err != nil { 279 return nil, fmt.Errorf("error iterating comments: %w", err) 280 } 281 282 return result, nil 283} 284 285// CountByParent counts direct replies to a post or comment 286// Used for showing reply counts in threading UI 287func (r *postgresCommentRepo) CountByParent(ctx context.Context, parentURI string) (int, error) { 288 query := ` 289 SELECT COUNT(*) 290 FROM comments 291 WHERE parent_uri = $1 AND deleted_at IS NULL 292 ` 293 294 var count int 295 err := r.db.QueryRowContext(ctx, query, parentURI).Scan(&count) 296 if err != nil { 297 return 0, fmt.Errorf("failed to count comments by parent: %w", err) 298 } 299 300 return count, nil 301} 302 303// ListByCommenter retrieves all active comments by a specific user 304// Future: Used for user comment history 305func (r *postgresCommentRepo) ListByCommenter(ctx context.Context, commenterDID string, limit, offset int) ([]*comments.Comment, error) { 306 query := ` 307 SELECT 308 id, uri, cid, rkey, commenter_did, 309 root_uri, root_cid, parent_uri, parent_cid, 310 content, content_facets, embed, content_labels, langs, 311 created_at, indexed_at, deleted_at, 312 upvote_count, downvote_count, score, reply_count 313 FROM comments 314 WHERE commenter_did = $1 AND deleted_at IS NULL 315 ORDER BY created_at DESC 316 LIMIT $2 OFFSET $3 317 ` 318 319 rows, err := r.db.QueryContext(ctx, query, commenterDID, limit, offset) 320 if err != nil { 321 return nil, fmt.Errorf("failed to list comments by commenter: %w", err) 322 } 323 defer func() { 324 if err := rows.Close(); err != nil { 325 log.Printf("Failed to close rows: %v", err) 326 } 327 }() 328 329 var result []*comments.Comment 330 for rows.Next() { 331 var comment comments.Comment 332 var langs pq.StringArray 333 334 err := rows.Scan( 335 &comment.ID, &comment.URI, &comment.CID, &comment.RKey, &comment.CommenterDID, 336 &comment.RootURI, &comment.RootCID, &comment.ParentURI, &comment.ParentCID, 337 &comment.Content, &comment.ContentFacets, &comment.Embed, &comment.ContentLabels, &langs, 338 &comment.CreatedAt, &comment.IndexedAt, &comment.DeletedAt, 339 &comment.UpvoteCount, &comment.DownvoteCount, &comment.Score, &comment.ReplyCount, 340 ) 341 if err != nil { 342 return nil, fmt.Errorf("failed to scan comment: %w", err) 343 } 344 345 comment.Langs = langs 346 result = append(result, &comment) 347 } 348 349 if err = rows.Err(); err != nil { 350 return nil, fmt.Errorf("error iterating comments: %w", err) 351 } 352 353 return result, nil 354}