A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "strings" 8 9 "Coves/internal/core/posts" 10) 11 12type postgresPostRepo struct { 13 db *sql.DB 14} 15 16// NewPostRepository creates a new PostgreSQL post repository 17func NewPostRepository(db *sql.DB) posts.Repository { 18 return &postgresPostRepo{db: db} 19} 20 21// Create inserts a new post into the posts table 22// Called by Jetstream consumer after post is created on PDS 23func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error { 24 // Serialize JSON fields for storage 25 var facetsJSON, embedJSON sql.NullString 26 27 if post.ContentFacets != nil { 28 facetsJSON.String = *post.ContentFacets 29 facetsJSON.Valid = true 30 } 31 32 if post.Embed != nil { 33 embedJSON.String = *post.Embed 34 embedJSON.Valid = true 35 } 36 37 // Store content labels as JSONB 38 // post.ContentLabels contains com.atproto.label.defs#selfLabels JSON: {"values":[{"val":"nsfw","neg":false}]} 39 // Store the full JSON blob to preserve the 'neg' field and future extensions 40 var labelsJSON sql.NullString 41 if post.ContentLabels != nil { 42 labelsJSON.String = *post.ContentLabels 43 labelsJSON.Valid = true 44 } 45 46 query := ` 47 INSERT INTO posts ( 48 uri, cid, rkey, author_did, community_did, 49 title, content, content_facets, embed, content_labels, 50 created_at, indexed_at 51 ) VALUES ( 52 $1, $2, $3, $4, $5, 53 $6, $7, $8, $9, $10, 54 $11, NOW() 55 ) 56 RETURNING id, indexed_at 57 ` 58 59 err := r.db.QueryRowContext( 60 ctx, query, 61 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 62 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON, 63 post.CreatedAt, 64 ).Scan(&post.ID, &post.IndexedAt) 65 if err != nil { 66 // Check for duplicate URI (post already indexed) 67 if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") { 68 return fmt.Errorf("post already indexed: %s", post.URI) 69 } 70 71 // Check for foreign key violations 72 if strings.Contains(err.Error(), "violates foreign key constraint") { 73 if strings.Contains(err.Error(), "fk_author") { 74 return fmt.Errorf("author DID not found: %s", post.AuthorDID) 75 } 76 if strings.Contains(err.Error(), "fk_community") { 77 return fmt.Errorf("community DID not found: %s", post.CommunityDID) 78 } 79 } 80 81 return fmt.Errorf("failed to insert post: %w", err) 82 } 83 84 return nil 85} 86 87// GetByURI retrieves a post by its AT-URI 88// Used for E2E test verification and future GET endpoint 89func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) { 90 query := ` 91 SELECT 92 id, uri, cid, rkey, author_did, community_did, 93 title, content, content_facets, embed, content_labels, 94 created_at, edited_at, indexed_at, deleted_at, 95 upvote_count, downvote_count, score, comment_count 96 FROM posts 97 WHERE uri = $1 98 ` 99 100 var post posts.Post 101 var facetsJSON, embedJSON, labelsJSON sql.NullString 102 103 err := r.db.QueryRowContext(ctx, query, uri).Scan( 104 &post.ID, &post.URI, &post.CID, &post.RKey, 105 &post.AuthorDID, &post.CommunityDID, 106 &post.Title, &post.Content, &facetsJSON, &embedJSON, &labelsJSON, 107 &post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt, 108 &post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount, 109 ) 110 111 if err == sql.ErrNoRows { 112 return nil, posts.ErrNotFound 113 } 114 if err != nil { 115 return nil, fmt.Errorf("failed to get post by URI: %w", err) 116 } 117 118 // Convert SQL types back to Go types 119 if facetsJSON.Valid { 120 post.ContentFacets = &facetsJSON.String 121 } 122 if embedJSON.Valid { 123 post.Embed = &embedJSON.String 124 } 125 if labelsJSON.Valid { 126 // Labels are stored as JSONB containing full com.atproto.label.defs#selfLabels structure 127 post.ContentLabels = &labelsJSON.String 128 } 129 130 return &post, nil 131}