A community based topic aggregation platform built on atproto

feat(posts): add database schema and repository layer

- Add migration 011: posts table with indexes
- Add foreign keys to users and communities
- Add indexes for common query patterns (community feed, author, score)
- Add PostgreSQL repository implementation
- Add Create() and GetByURI() methods
- Add JSON serialization for facets, embeds, labels

Posts table supports:
- AT-URI, CID, rkey for atProto compliance
- Title, content, facets, embed, labels
- Vote counts and score (denormalized for performance)
- Soft delete with deleted_at timestamp

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+189
internal
db
+51
internal/db/migrations/011_create_posts_table.sql
···
···
+
-- +goose Up
+
-- Create posts table for AppView indexing
+
-- Posts are indexed from the firehose after being written to community repositories
+
CREATE TABLE posts (
+
id BIGSERIAL PRIMARY KEY,
+
uri TEXT UNIQUE NOT NULL, -- AT-URI (at://community_did/social.coves.post.record/rkey)
+
cid TEXT NOT NULL, -- Content ID
+
rkey TEXT NOT NULL, -- Record key (TID)
+
author_did TEXT NOT NULL, -- Author's DID (from record metadata)
+
community_did TEXT NOT NULL, -- Community DID (from AT-URI repo field)
+
+
-- Content (all nullable per lexicon)
+
title TEXT, -- Post title
+
content TEXT, -- Post content/body
+
content_facets JSONB, -- Rich text facets (app.bsky.richtext.facet)
+
embed JSONB, -- Embedded content (images, video, external, record)
+
content_labels TEXT[], -- Self-applied labels (nsfw, spoiler, violence)
+
+
-- Timestamps
+
created_at TIMESTAMPTZ NOT NULL, -- Author's timestamp from record
+
edited_at TIMESTAMPTZ, -- Last edit timestamp (future)
+
indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When indexed by AppView
+
deleted_at TIMESTAMPTZ, -- Soft delete (for firehose delete events)
+
+
-- Stats (denormalized for performance)
+
upvote_count INT NOT NULL DEFAULT 0,
+
downvote_count INT NOT NULL DEFAULT 0,
+
score INT NOT NULL DEFAULT 0, -- upvote_count - downvote_count (for sorting)
+
comment_count INT NOT NULL DEFAULT 0,
+
+
-- Foreign keys
+
CONSTRAINT fk_author FOREIGN KEY (author_did) REFERENCES users(did) ON DELETE CASCADE,
+
CONSTRAINT fk_community FOREIGN KEY (community_did) REFERENCES communities(did) ON DELETE CASCADE
+
);
+
+
-- Indexes for common query patterns
+
CREATE INDEX idx_posts_community_created ON posts(community_did, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_posts_community_score ON posts(community_did, score DESC, created_at DESC) WHERE deleted_at IS NULL;
+
CREATE INDEX idx_posts_author ON posts(author_did, created_at DESC);
+
CREATE INDEX idx_posts_uri ON posts(uri);
+
+
-- Index for full-text search on content (future)
+
-- CREATE INDEX idx_posts_content_search ON posts USING gin(to_tsvector('english', content)) WHERE deleted_at IS NULL;
+
+
-- Comment on table
+
COMMENT ON TABLE posts IS 'Posts indexed from community repositories via Jetstream firehose consumer';
+
COMMENT ON COLUMN posts.uri IS 'AT-URI in format: at://community_did/social.coves.post.record/rkey';
+
COMMENT ON COLUMN posts.score IS 'Computed as upvote_count - downvote_count for ranking algorithms';
+
+
-- +goose Down
+
DROP TABLE IF EXISTS posts CASCADE;
+138
internal/db/postgres/post_repo.go
···
···
+
package postgres
+
+
import (
+
"Coves/internal/core/posts"
+
"context"
+
"database/sql"
+
"encoding/json"
+
"fmt"
+
"strings"
+
+
"github.com/lib/pq"
+
)
+
+
type postgresPostRepo struct {
+
db *sql.DB
+
}
+
+
// NewPostRepository creates a new PostgreSQL post repository
+
func NewPostRepository(db *sql.DB) posts.Repository {
+
return &postgresPostRepo{db: db}
+
}
+
+
// Create inserts a new post into the posts table
+
// Called by Jetstream consumer after post is created on PDS
+
func (r *postgresPostRepo) Create(ctx context.Context, post *posts.Post) error {
+
// Serialize JSON fields for storage
+
var facetsJSON, embedJSON sql.NullString
+
+
if post.ContentFacets != nil {
+
facetsJSON.String = *post.ContentFacets
+
facetsJSON.Valid = true
+
}
+
+
if post.Embed != nil {
+
embedJSON.String = *post.Embed
+
embedJSON.Valid = true
+
}
+
+
// Convert content labels to PostgreSQL array
+
var labelsArray pq.StringArray
+
if post.ContentLabels != nil {
+
// Parse JSON array string to []string
+
var labels []string
+
if err := json.Unmarshal([]byte(*post.ContentLabels), &labels); err == nil {
+
labelsArray = labels
+
}
+
}
+
+
query := `
+
INSERT INTO posts (
+
uri, cid, rkey, author_did, community_did,
+
title, content, content_facets, embed, content_labels,
+
created_at, indexed_at
+
) VALUES (
+
$1, $2, $3, $4, $5,
+
$6, $7, $8, $9, $10,
+
$11, NOW()
+
)
+
RETURNING id, indexed_at
+
`
+
+
err := r.db.QueryRowContext(
+
ctx, query,
+
post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
+
post.Title, post.Content, facetsJSON, embedJSON, labelsArray,
+
post.CreatedAt,
+
).Scan(&post.ID, &post.IndexedAt)
+
if err != nil {
+
// Check for duplicate URI (post already indexed)
+
if strings.Contains(err.Error(), "duplicate key") && strings.Contains(err.Error(), "posts_uri_key") {
+
return fmt.Errorf("post already indexed: %s", post.URI)
+
}
+
+
// Check for foreign key violations
+
if strings.Contains(err.Error(), "violates foreign key constraint") {
+
if strings.Contains(err.Error(), "fk_author") {
+
return fmt.Errorf("author DID not found: %s", post.AuthorDID)
+
}
+
if strings.Contains(err.Error(), "fk_community") {
+
return fmt.Errorf("community DID not found: %s", post.CommunityDID)
+
}
+
}
+
+
return fmt.Errorf("failed to insert post: %w", err)
+
}
+
+
return nil
+
}
+
+
// GetByURI retrieves a post by its AT-URI
+
// Used for E2E test verification and future GET endpoint
+
func (r *postgresPostRepo) GetByURI(ctx context.Context, uri string) (*posts.Post, error) {
+
query := `
+
SELECT
+
id, uri, cid, rkey, author_did, community_did,
+
title, content, content_facets, embed, content_labels,
+
created_at, edited_at, indexed_at, deleted_at,
+
upvote_count, downvote_count, score, comment_count
+
FROM posts
+
WHERE uri = $1
+
`
+
+
var post posts.Post
+
var facetsJSON, embedJSON sql.NullString
+
var contentLabels pq.StringArray
+
+
err := r.db.QueryRowContext(ctx, query, uri).Scan(
+
&post.ID, &post.URI, &post.CID, &post.RKey,
+
&post.AuthorDID, &post.CommunityDID,
+
&post.Title, &post.Content, &facetsJSON, &embedJSON, &contentLabels,
+
&post.CreatedAt, &post.EditedAt, &post.IndexedAt, &post.DeletedAt,
+
&post.UpvoteCount, &post.DownvoteCount, &post.Score, &post.CommentCount,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, posts.ErrNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get post by URI: %w", err)
+
}
+
+
// Convert SQL types back to Go types
+
if facetsJSON.Valid {
+
post.ContentFacets = &facetsJSON.String
+
}
+
if embedJSON.Valid {
+
post.Embed = &embedJSON.String
+
}
+
if len(contentLabels) > 0 {
+
labelsJSON, marshalErr := json.Marshal(contentLabels)
+
if marshalErr == nil {
+
labelsStr := string(labelsJSON)
+
post.ContentLabels = &labelsStr
+
}
+
}
+
+
return &post, nil
+
}