A community based topic aggregation platform built on atproto

feat(db): add aggregator database schema

Create comprehensive database schema for aggregator system with
3 tables, 2 triggers, and optimized indexes.

Tables:
- aggregators: Service declarations indexed from Jetstream
- aggregator_authorizations: Community authorizations
- aggregator_posts: Rate limiting tracking (AppView-only)

Key features:
- Optimized indexes for <5ms authorization checks
- Partial indexes WHERE enabled=true for performance
- Foreign keys with CASCADE delete
- Auto-updating stats via triggers

Triggers:
- update_aggregator_communities_count: Tracks communities_using
- update_aggregator_posts_count: Tracks posts_created

Security:
- Audit trail fields (created_by, disabled_by, disabled_at)
- Unique constraint on (aggregator_did, community_did)
- NOT NULL constraints on required fields

Performance:
- idx_aggregator_auth_lookup: Fast (aggregator, community, enabled) checks
- idx_aggregator_posts_rate_limit: Fast rate limit queries
- idx_aggregators_created_at: Sorting by creation date

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+214
internal
+214
internal/db/migrations/012_create_aggregators_tables.sql
···
+
-- +goose Up
+
-- Create aggregators tables for indexing aggregator service declarations and authorizations
+
-- These records are indexed from Jetstream firehose consumer
+
+
-- ============================================================================
+
-- Table: aggregators
+
-- Purpose: Index aggregator service declarations from social.coves.aggregator.service records
+
-- Source: Aggregator's own repository (at://aggregator_did/social.coves.aggregator.service/self)
+
-- ============================================================================
+
CREATE TABLE aggregators (
+
-- Primary identity
+
did TEXT PRIMARY KEY, -- Aggregator's DID (must match repo DID)
+
+
-- Service metadata (from lexicon)
+
display_name TEXT NOT NULL, -- Human-readable name
+
description TEXT, -- What this aggregator does
+
config_schema JSONB, -- JSON Schema for community config validation
+
avatar_url TEXT, -- Avatar image URL (extracted from blob)
+
source_url TEXT, -- URL to source code (transparency)
+
maintainer_did TEXT, -- DID of maintainer
+
+
-- atProto record metadata
+
record_uri TEXT NOT NULL UNIQUE, -- AT-URI of service declaration record
+
record_cid TEXT NOT NULL, -- CID of current record version
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When the aggregator service was created (from lexicon createdAt field)
+
indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When indexed/updated by AppView
+
+
-- Cached stats (updated by aggregator_posts table triggers/queries)
+
communities_using INTEGER NOT NULL DEFAULT 0, -- Count of communities with enabled authorizations
+
posts_created BIGINT NOT NULL DEFAULT 0 -- Total posts created by this aggregator
+
);
+
+
-- Indexes for discovery and lookups
+
CREATE INDEX idx_aggregators_created_at ON aggregators(created_at DESC);
+
CREATE INDEX idx_aggregators_indexed_at ON aggregators(indexed_at DESC);
+
CREATE INDEX idx_aggregators_maintainer ON aggregators(maintainer_did);
+
+
-- Comments
+
COMMENT ON TABLE aggregators IS 'Aggregator service declarations indexed from social.coves.aggregator.service records';
+
COMMENT ON COLUMN aggregators.did IS 'DID of the aggregator service (matches repo DID)';
+
COMMENT ON COLUMN aggregators.config_schema IS 'JSON Schema defining what config options communities can set';
+
COMMENT ON COLUMN aggregators.created_at IS 'When the aggregator service was created (from lexicon record createdAt field)';
+
COMMENT ON COLUMN aggregators.communities_using IS 'Cached count of communities with enabled=true authorizations';
+
+
+
-- ============================================================================
+
-- Table: aggregator_authorizations
+
-- Purpose: Index community authorization records for aggregators
+
-- Source: Community's repository (at://community_did/social.coves.aggregator.authorization/rkey)
+
-- ============================================================================
+
CREATE TABLE aggregator_authorizations (
+
id BIGSERIAL PRIMARY KEY,
+
+
-- Authorization identity
+
aggregator_did TEXT NOT NULL, -- DID of authorized aggregator
+
community_did TEXT NOT NULL, -- DID of community granting access
+
+
-- Authorization state
+
enabled BOOLEAN NOT NULL DEFAULT true, -- Whether aggregator is currently active
+
config JSONB, -- Community-specific config (validated against aggregator's schema)
+
+
-- Audit trail (from lexicon)
+
created_at TIMESTAMPTZ NOT NULL, -- When authorization was created
+
created_by TEXT NOT NULL, -- DID of moderator who authorized (set by API, not client)
+
disabled_at TIMESTAMPTZ, -- When authorization was disabled (if enabled=false)
+
disabled_by TEXT, -- DID of moderator who disabled
+
+
-- atProto record metadata
+
record_uri TEXT NOT NULL UNIQUE, -- AT-URI of authorization record
+
record_cid TEXT NOT NULL, -- CID of current record version
+
indexed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When indexed/updated by AppView
+
+
-- Constraints
+
UNIQUE(aggregator_did, community_did), -- One authorization per aggregator per community
+
CONSTRAINT fk_aggregator FOREIGN KEY (aggregator_did) REFERENCES aggregators(did) ON DELETE CASCADE,
+
CONSTRAINT fk_community FOREIGN KEY (community_did) REFERENCES communities(did) ON DELETE CASCADE
+
);
+
+
-- Indexes for authorization checks (CRITICAL PATH - used on every aggregator post)
+
CREATE INDEX idx_aggregator_auth_agg_enabled ON aggregator_authorizations(aggregator_did, enabled) WHERE enabled = true;
+
CREATE INDEX idx_aggregator_auth_comm_enabled ON aggregator_authorizations(community_did, enabled) WHERE enabled = true;
+
CREATE INDEX idx_aggregator_auth_lookup ON aggregator_authorizations(aggregator_did, community_did, enabled);
+
+
-- Indexes for listing/discovery
+
CREATE INDEX idx_aggregator_auth_agg_did ON aggregator_authorizations(aggregator_did, created_at DESC);
+
CREATE INDEX idx_aggregator_auth_comm_did ON aggregator_authorizations(community_did, created_at DESC);
+
+
-- Comments
+
COMMENT ON TABLE aggregator_authorizations IS 'Community authorizations for aggregators indexed from social.coves.aggregator.authorization records';
+
COMMENT ON COLUMN aggregator_authorizations.config IS 'Community-specific config, validated against aggregators.config_schema';
+
COMMENT ON INDEX idx_aggregator_auth_lookup IS 'CRITICAL: Fast lookup for post creation authorization checks';
+
+
+
-- ============================================================================
+
-- Table: aggregator_posts
+
-- Purpose: Track posts created by aggregators for rate limiting and stats
+
-- Note: This is AppView-only data, not from lexicon records
+
-- ============================================================================
+
CREATE TABLE aggregator_posts (
+
id BIGSERIAL PRIMARY KEY,
+
+
-- Post identity
+
aggregator_did TEXT NOT NULL, -- DID of aggregator that created the post
+
community_did TEXT NOT NULL, -- DID of community post was created in
+
post_uri TEXT NOT NULL, -- AT-URI of the post record
+
post_cid TEXT NOT NULL, -- CID of the post
+
+
-- Timestamp
+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- When post was created
+
+
-- Constraints
+
UNIQUE(post_uri), -- Each post tracked once
+
CONSTRAINT fk_aggregator_posts_agg FOREIGN KEY (aggregator_did) REFERENCES aggregators(did) ON DELETE CASCADE,
+
CONSTRAINT fk_aggregator_posts_comm FOREIGN KEY (community_did) REFERENCES communities(did) ON DELETE CASCADE
+
);
+
+
-- Indexes for rate limiting queries (CRITICAL PATH - used on every aggregator post)
+
CREATE INDEX idx_aggregator_posts_rate_limit ON aggregator_posts(aggregator_did, community_did, created_at DESC);
+
+
-- Indexes for stats
+
CREATE INDEX idx_aggregator_posts_agg_did ON aggregator_posts(aggregator_did, created_at DESC);
+
CREATE INDEX idx_aggregator_posts_comm_did ON aggregator_posts(community_did, created_at DESC);
+
+
-- Comments
+
COMMENT ON TABLE aggregator_posts IS 'AppView-only tracking of posts created by aggregators for rate limiting and stats';
+
COMMENT ON INDEX idx_aggregator_posts_rate_limit IS 'CRITICAL: Fast rate limit checks (posts in last hour per community)';
+
+
+
-- ============================================================================
+
-- Trigger: Update aggregator stats when authorizations change
+
-- Purpose: Keep aggregators.communities_using count accurate
+
-- ============================================================================
+
-- +goose StatementBegin
+
CREATE OR REPLACE FUNCTION update_aggregator_communities_count()
+
RETURNS TRIGGER AS $$
+
BEGIN
+
-- Recalculate communities_using count for affected aggregator
+
IF TG_OP = 'DELETE' THEN
+
UPDATE aggregators
+
SET communities_using = (
+
SELECT COUNT(*)
+
FROM aggregator_authorizations
+
WHERE aggregator_did = OLD.aggregator_did
+
AND enabled = true
+
)
+
WHERE did = OLD.aggregator_did;
+
RETURN OLD;
+
ELSE
+
UPDATE aggregators
+
SET communities_using = (
+
SELECT COUNT(*)
+
FROM aggregator_authorizations
+
WHERE aggregator_did = NEW.aggregator_did
+
AND enabled = true
+
)
+
WHERE did = NEW.aggregator_did;
+
RETURN NEW;
+
END IF;
+
END;
+
$$ LANGUAGE plpgsql;
+
-- +goose StatementEnd
+
+
CREATE TRIGGER trigger_update_aggregator_communities_count
+
AFTER INSERT OR UPDATE OR DELETE ON aggregator_authorizations
+
FOR EACH ROW
+
EXECUTE FUNCTION update_aggregator_communities_count();
+
+
COMMENT ON FUNCTION update_aggregator_communities_count IS 'Maintains aggregators.communities_using count when authorizations change';
+
+
+
-- ============================================================================
+
-- Trigger: Update aggregator stats when posts are created
+
-- Purpose: Keep aggregators.posts_created count accurate
+
-- ============================================================================
+
-- +goose StatementBegin
+
CREATE OR REPLACE FUNCTION update_aggregator_posts_count()
+
RETURNS TRIGGER AS $$
+
BEGIN
+
IF TG_OP = 'INSERT' THEN
+
UPDATE aggregators
+
SET posts_created = posts_created + 1
+
WHERE did = NEW.aggregator_did;
+
RETURN NEW;
+
ELSIF TG_OP = 'DELETE' THEN
+
UPDATE aggregators
+
SET posts_created = posts_created - 1
+
WHERE did = OLD.aggregator_did;
+
RETURN OLD;
+
END IF;
+
END;
+
$$ LANGUAGE plpgsql;
+
-- +goose StatementEnd
+
+
CREATE TRIGGER trigger_update_aggregator_posts_count
+
AFTER INSERT OR DELETE ON aggregator_posts
+
FOR EACH ROW
+
EXECUTE FUNCTION update_aggregator_posts_count();
+
+
COMMENT ON FUNCTION update_aggregator_posts_count IS 'Maintains aggregators.posts_created count when posts are tracked';
+
+
+
-- +goose Down
+
-- Drop triggers first
+
DROP TRIGGER IF EXISTS trigger_update_aggregator_posts_count ON aggregator_posts;
+
DROP TRIGGER IF EXISTS trigger_update_aggregator_communities_count ON aggregator_authorizations;
+
+
-- Drop functions
+
DROP FUNCTION IF EXISTS update_aggregator_posts_count();
+
DROP FUNCTION IF EXISTS update_aggregator_communities_count();
+
+
-- Drop tables in reverse order (respects foreign keys)
+
DROP TABLE IF EXISTS aggregator_posts CASCADE;
+
DROP TABLE IF EXISTS aggregator_authorizations CASCADE;
+
DROP TABLE IF EXISTS aggregators CASCADE;