A community based topic aggregation platform built on atproto

feat(communities): Add PostgreSQL repository with atomic operations

Database schema:
- communities table with pg_trgm indexes for fuzzy search
- community_subscriptions with composite index for lookups
- community_memberships with reputation tracking
- community_moderation (V2 prepared)

Repository features:
- Atomic SubscribeWithCount/UnsubscribeWithCount transactions
- Idempotent operations for Jetstream replay safety
- Full-text search with relevance filtering (>0.2 threshold)
- Pagination and filtering support
- Proper error handling with domain error mapping

Performance optimizations:
- Composite index on (user_did, community_did) for subscription lookups
- GIN indexes for trigram similarity search
- GREATEST(0, count - 1) prevents negative counts

+148
internal/db/migrations/005_create_communities_tables.sql
···
+
-- +goose Up
+
-- Enable pg_trgm extension for fuzzy text search
+
CREATE EXTENSION IF NOT EXISTS pg_trgm;
+
+
-- Communities table: stores community metadata indexed from firehose
+
CREATE TABLE communities (
+
id SERIAL PRIMARY KEY,
+
did TEXT UNIQUE NOT NULL, -- Community DID (did:coves:xxx or did:plc:xxx)
+
handle TEXT UNIQUE NOT NULL, -- Scoped handle (!gaming@coves.social)
+
name TEXT NOT NULL, -- Short community name (local part of handle)
+
display_name TEXT, -- Display name for UI
+
description TEXT, -- Community description
+
description_facets JSONB, -- Rich text annotations
+
avatar_cid TEXT, -- CID of avatar image blob
+
banner_cid TEXT, -- CID of banner image blob
+
+
-- Ownership & hosting
+
owner_did TEXT NOT NULL, -- DID of community owner (instance in V1)
+
created_by_did TEXT NOT NULL, -- DID of user who created community
+
hosted_by_did TEXT NOT NULL, -- DID of hosting instance
+
+
-- Visibility & federation
+
visibility TEXT NOT NULL DEFAULT 'public' CHECK (visibility IN ('public', 'unlisted', 'private')),
+
allow_external_discovery BOOLEAN NOT NULL DEFAULT true,
+
+
-- Moderation
+
moderation_type TEXT CHECK (moderation_type IN ('moderator', 'sortition')),
+
content_warnings TEXT[], -- Array of content warning types
+
+
-- Statistics (cached counts)
+
member_count INTEGER DEFAULT 0,
+
subscriber_count INTEGER DEFAULT 0,
+
post_count INTEGER DEFAULT 0,
+
+
-- Federation metadata (for future cross-platform support)
+
federated_from TEXT, -- 'lemmy', 'coves', etc.
+
federated_id TEXT, -- Original ID on federated platform
+
+
-- Timestamps
+
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
+
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
+
-- AT-Proto metadata
+
record_uri TEXT, -- AT-URI of the community profile record
+
record_cid TEXT -- CID of the community profile record
+
);
+
+
-- Indexes for efficient queries
+
CREATE INDEX idx_communities_handle ON communities(handle);
+
CREATE INDEX idx_communities_visibility ON communities(visibility);
+
CREATE INDEX idx_communities_hosted_by ON communities(hosted_by_did);
+
CREATE INDEX idx_communities_created_by ON communities(created_by_did);
+
CREATE INDEX idx_communities_created_at ON communities(created_at);
+
CREATE INDEX idx_communities_name_trgm ON communities USING gin(name gin_trgm_ops); -- For fuzzy search
+
CREATE INDEX idx_communities_description_trgm ON communities USING gin(description gin_trgm_ops);
+
+
-- Subscriptions table: lightweight feed following
+
CREATE TABLE community_subscriptions (
+
id SERIAL PRIMARY KEY,
+
user_did TEXT NOT NULL,
+
community_did TEXT NOT NULL REFERENCES communities(did) ON DELETE CASCADE,
+
subscribed_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
+
-- AT-Proto metadata (subscription is a record in user's repo)
+
record_uri TEXT, -- AT-URI of the subscription record
+
record_cid TEXT, -- CID of the subscription record
+
+
UNIQUE(user_did, community_did)
+
);
+
+
-- Indexes for subscription queries
+
CREATE INDEX idx_subscriptions_user ON community_subscriptions(user_did);
+
CREATE INDEX idx_subscriptions_community ON community_subscriptions(community_did);
+
CREATE INDEX idx_subscriptions_user_community ON community_subscriptions(user_did, community_did); -- Composite index for GetSubscription
+
CREATE INDEX idx_subscriptions_subscribed_at ON community_subscriptions(subscribed_at);
+
+
-- Memberships table: active participation & reputation tracking
+
CREATE TABLE community_memberships (
+
id SERIAL PRIMARY KEY,
+
user_did TEXT NOT NULL,
+
community_did TEXT NOT NULL REFERENCES communities(did) ON DELETE CASCADE,
+
+
-- Reputation system
+
reputation_score INTEGER DEFAULT 0, -- Gained through participation
+
contribution_count INTEGER DEFAULT 0, -- Total posts + comments + actions
+
+
-- Activity tracking
+
joined_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
last_active_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
+
-- Moderation status
+
is_banned BOOLEAN DEFAULT false,
+
is_moderator BOOLEAN DEFAULT false,
+
+
UNIQUE(user_did, community_did)
+
);
+
+
-- Indexes for membership queries
+
CREATE INDEX idx_memberships_user ON community_memberships(user_did);
+
CREATE INDEX idx_memberships_community ON community_memberships(community_did);
+
CREATE INDEX idx_memberships_reputation ON community_memberships(community_did, reputation_score DESC);
+
CREATE INDEX idx_memberships_last_active ON community_memberships(last_active_at);
+
+
-- Community moderation actions table (V2 feature, schema prepared)
+
CREATE TABLE community_moderation (
+
id SERIAL PRIMARY KEY,
+
community_did TEXT NOT NULL REFERENCES communities(did) ON DELETE CASCADE,
+
action TEXT NOT NULL CHECK (action IN ('delist', 'quarantine', 'remove')),
+
reason TEXT,
+
instance_did TEXT NOT NULL, -- Which instance took this action
+
broadcast BOOLEAN DEFAULT false, -- Share moderation signal with network?
+
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+
expires_at TIMESTAMP WITH TIME ZONE -- Optional: temporary moderation
+
);
+
+
-- Indexes for moderation queries
+
CREATE INDEX idx_moderation_community ON community_moderation(community_did);
+
CREATE INDEX idx_moderation_instance ON community_moderation(instance_did);
+
CREATE INDEX idx_moderation_action ON community_moderation(action);
+
CREATE INDEX idx_moderation_created_at ON community_moderation(created_at);
+
+
-- +goose Down
+
DROP INDEX IF EXISTS idx_moderation_created_at;
+
DROP INDEX IF EXISTS idx_moderation_action;
+
DROP INDEX IF EXISTS idx_moderation_instance;
+
DROP INDEX IF EXISTS idx_moderation_community;
+
DROP TABLE IF EXISTS community_moderation;
+
+
DROP INDEX IF EXISTS idx_memberships_last_active;
+
DROP INDEX IF EXISTS idx_memberships_reputation;
+
DROP INDEX IF EXISTS idx_memberships_community;
+
DROP INDEX IF EXISTS idx_memberships_user;
+
DROP TABLE IF EXISTS community_memberships;
+
+
DROP INDEX IF EXISTS idx_subscriptions_subscribed_at;
+
DROP INDEX IF EXISTS idx_subscriptions_user_community;
+
DROP INDEX IF EXISTS idx_subscriptions_community;
+
DROP INDEX IF EXISTS idx_subscriptions_user;
+
DROP TABLE IF EXISTS community_subscriptions;
+
+
DROP INDEX IF EXISTS idx_communities_description_trgm;
+
DROP INDEX IF EXISTS idx_communities_name_trgm;
+
DROP INDEX IF EXISTS idx_communities_created_at;
+
DROP INDEX IF EXISTS idx_communities_created_by;
+
DROP INDEX IF EXISTS idx_communities_hosted_by;
+
DROP INDEX IF EXISTS idx_communities_visibility;
+
DROP INDEX IF EXISTS idx_communities_handle;
+
DROP TABLE IF EXISTS communities;
+495
internal/db/postgres/community_repo.go
···
+
package postgres
+
+
import (
+
"context"
+
"database/sql"
+
"fmt"
+
"strings"
+
+
"Coves/internal/core/communities"
+
"github.com/lib/pq"
+
)
+
+
type postgresCommunityRepo struct {
+
db *sql.DB
+
}
+
+
// NewCommunityRepository creates a new PostgreSQL community repository
+
func NewCommunityRepository(db *sql.DB) communities.Repository {
+
return &postgresCommunityRepo{db: db}
+
}
+
+
// Create inserts a new community into the communities table
+
func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
+
query := `
+
INSERT INTO communities (
+
did, handle, name, display_name, description, description_facets,
+
avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
+
visibility, allow_external_discovery, moderation_type, content_warnings,
+
member_count, subscriber_count, post_count,
+
federated_from, federated_id, created_at, updated_at,
+
record_uri, record_cid
+
) VALUES (
+
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15,
+
$16, $17, $18, $19, $20, $21, $22, $23, $24
+
)
+
RETURNING id, created_at, updated_at`
+
+
// Handle JSONB field - use sql.NullString with valid JSON or NULL
+
var descFacets interface{}
+
if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 {
+
descFacets = community.DescriptionFacets
+
} else {
+
descFacets = nil
+
}
+
+
err := r.db.QueryRowContext(ctx, query,
+
community.DID,
+
community.Handle,
+
community.Name,
+
nullString(community.DisplayName),
+
nullString(community.Description),
+
descFacets,
+
nullString(community.AvatarCID),
+
nullString(community.BannerCID),
+
community.OwnerDID,
+
community.CreatedByDID,
+
community.HostedByDID,
+
community.Visibility,
+
community.AllowExternalDiscovery,
+
nullString(community.ModerationType),
+
pq.Array(community.ContentWarnings),
+
community.MemberCount,
+
community.SubscriberCount,
+
community.PostCount,
+
nullString(community.FederatedFrom),
+
nullString(community.FederatedID),
+
community.CreatedAt,
+
community.UpdatedAt,
+
nullString(community.RecordURI),
+
nullString(community.RecordCID),
+
).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
+
+
if err != nil {
+
// Check for unique constraint violations
+
if strings.Contains(err.Error(), "duplicate key") {
+
if strings.Contains(err.Error(), "communities_did_key") {
+
return nil, communities.ErrCommunityAlreadyExists
+
}
+
if strings.Contains(err.Error(), "communities_handle_key") {
+
return nil, communities.ErrHandleTaken
+
}
+
}
+
return nil, fmt.Errorf("failed to create community: %w", err)
+
}
+
+
return community, nil
+
}
+
+
// GetByDID retrieves a community by its DID
+
func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
+
community := &communities.Community{}
+
query := `
+
SELECT id, did, handle, name, display_name, description, description_facets,
+
avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
+
visibility, allow_external_discovery, moderation_type, content_warnings,
+
member_count, subscriber_count, post_count,
+
federated_from, federated_id, created_at, updated_at,
+
record_uri, record_cid
+
FROM communities
+
WHERE did = $1`
+
+
var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
+
var federatedFrom, federatedID, recordURI, recordCID sql.NullString
+
var descFacets []byte
+
var contentWarnings []string
+
+
err := r.db.QueryRowContext(ctx, query, did).Scan(
+
&community.ID, &community.DID, &community.Handle, &community.Name,
+
&displayName, &description, &descFacets,
+
&avatarCID, &bannerCID,
+
&community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
+
&community.Visibility, &community.AllowExternalDiscovery,
+
&moderationType, pq.Array(&contentWarnings),
+
&community.MemberCount, &community.SubscriberCount, &community.PostCount,
+
&federatedFrom, &federatedID,
+
&community.CreatedAt, &community.UpdatedAt,
+
&recordURI, &recordCID,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrCommunityNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get community by DID: %w", err)
+
}
+
+
// Map nullable fields
+
community.DisplayName = displayName.String
+
community.Description = description.String
+
community.AvatarCID = avatarCID.String
+
community.BannerCID = bannerCID.String
+
community.ModerationType = moderationType.String
+
community.ContentWarnings = contentWarnings
+
community.FederatedFrom = federatedFrom.String
+
community.FederatedID = federatedID.String
+
community.RecordURI = recordURI.String
+
community.RecordCID = recordCID.String
+
if descFacets != nil {
+
community.DescriptionFacets = descFacets
+
}
+
+
return community, nil
+
}
+
+
// GetByHandle retrieves a community by its scoped handle
+
func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
+
community := &communities.Community{}
+
query := `
+
SELECT id, did, handle, name, display_name, description, description_facets,
+
avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
+
visibility, allow_external_discovery, moderation_type, content_warnings,
+
member_count, subscriber_count, post_count,
+
federated_from, federated_id, created_at, updated_at,
+
record_uri, record_cid
+
FROM communities
+
WHERE handle = $1`
+
+
var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
+
var federatedFrom, federatedID, recordURI, recordCID sql.NullString
+
var descFacets []byte
+
var contentWarnings []string
+
+
err := r.db.QueryRowContext(ctx, query, handle).Scan(
+
&community.ID, &community.DID, &community.Handle, &community.Name,
+
&displayName, &description, &descFacets,
+
&avatarCID, &bannerCID,
+
&community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
+
&community.Visibility, &community.AllowExternalDiscovery,
+
&moderationType, pq.Array(&contentWarnings),
+
&community.MemberCount, &community.SubscriberCount, &community.PostCount,
+
&federatedFrom, &federatedID,
+
&community.CreatedAt, &community.UpdatedAt,
+
&recordURI, &recordCID,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrCommunityNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get community by handle: %w", err)
+
}
+
+
// Map nullable fields
+
community.DisplayName = displayName.String
+
community.Description = description.String
+
community.AvatarCID = avatarCID.String
+
community.BannerCID = bannerCID.String
+
community.ModerationType = moderationType.String
+
community.ContentWarnings = contentWarnings
+
community.FederatedFrom = federatedFrom.String
+
community.FederatedID = federatedID.String
+
community.RecordURI = recordURI.String
+
community.RecordCID = recordCID.String
+
if descFacets != nil {
+
community.DescriptionFacets = descFacets
+
}
+
+
return community, nil
+
}
+
+
// Update modifies an existing community's metadata
+
func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
+
query := `
+
UPDATE communities
+
SET display_name = $2, description = $3, description_facets = $4,
+
avatar_cid = $5, banner_cid = $6,
+
visibility = $7, allow_external_discovery = $8,
+
moderation_type = $9, content_warnings = $10,
+
updated_at = NOW(),
+
record_uri = $11, record_cid = $12
+
WHERE did = $1
+
RETURNING updated_at`
+
+
// Handle JSONB field - use sql.NullString with valid JSON or NULL
+
var descFacets interface{}
+
if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 {
+
descFacets = community.DescriptionFacets
+
} else {
+
descFacets = nil
+
}
+
+
err := r.db.QueryRowContext(ctx, query,
+
community.DID,
+
nullString(community.DisplayName),
+
nullString(community.Description),
+
descFacets,
+
nullString(community.AvatarCID),
+
nullString(community.BannerCID),
+
community.Visibility,
+
community.AllowExternalDiscovery,
+
nullString(community.ModerationType),
+
pq.Array(community.ContentWarnings),
+
nullString(community.RecordURI),
+
nullString(community.RecordCID),
+
).Scan(&community.UpdatedAt)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrCommunityNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to update community: %w", err)
+
}
+
+
return community, nil
+
}
+
+
// Delete removes a community from the database
+
func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
+
query := `DELETE FROM communities WHERE did = $1`
+
+
result, err := r.db.ExecContext(ctx, query, did)
+
if err != nil {
+
return fmt.Errorf("failed to delete community: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check delete result: %w", err)
+
}
+
+
if rowsAffected == 0 {
+
return communities.ErrCommunityNotFound
+
}
+
+
return nil
+
}
+
+
// List retrieves communities with filtering and pagination
+
func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
+
// Build query with filters
+
whereClauses := []string{}
+
args := []interface{}{}
+
argCount := 1
+
+
if req.Visibility != "" {
+
whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
+
args = append(args, req.Visibility)
+
argCount++
+
}
+
+
if req.HostedBy != "" {
+
whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
+
args = append(args, req.HostedBy)
+
argCount++
+
}
+
+
whereClause := ""
+
if len(whereClauses) > 0 {
+
whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
+
}
+
+
// Get total count
+
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
+
var totalCount int
+
err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to count communities: %w", err)
+
}
+
+
// Build sort clause
+
sortColumn := "created_at"
+
if req.SortBy != "" {
+
switch req.SortBy {
+
case "member_count", "subscriber_count", "post_count", "created_at":
+
sortColumn = req.SortBy
+
}
+
}
+
+
sortOrder := "DESC"
+
if strings.ToUpper(req.SortOrder) == "ASC" {
+
sortOrder = "ASC"
+
}
+
+
// Get communities with pagination
+
query := fmt.Sprintf(`
+
SELECT id, did, handle, name, display_name, description, description_facets,
+
avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
+
visibility, allow_external_discovery, moderation_type, content_warnings,
+
member_count, subscriber_count, post_count,
+
federated_from, federated_id, created_at, updated_at,
+
record_uri, record_cid
+
FROM communities
+
%s
+
ORDER BY %s %s
+
LIMIT $%d OFFSET $%d`,
+
whereClause, sortColumn, sortOrder, argCount, argCount+1)
+
+
args = append(args, req.Limit, req.Offset)
+
+
rows, err := r.db.QueryContext(ctx, query, args...)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to list communities: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.Community{}
+
for rows.Next() {
+
community := &communities.Community{}
+
var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
+
var federatedFrom, federatedID, recordURI, recordCID sql.NullString
+
var descFacets []byte
+
var contentWarnings []string
+
+
err := rows.Scan(
+
&community.ID, &community.DID, &community.Handle, &community.Name,
+
&displayName, &description, &descFacets,
+
&avatarCID, &bannerCID,
+
&community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
+
&community.Visibility, &community.AllowExternalDiscovery,
+
&moderationType, pq.Array(&contentWarnings),
+
&community.MemberCount, &community.SubscriberCount, &community.PostCount,
+
&federatedFrom, &federatedID,
+
&community.CreatedAt, &community.UpdatedAt,
+
&recordURI, &recordCID,
+
)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to scan community: %w", err)
+
}
+
+
// Map nullable fields
+
community.DisplayName = displayName.String
+
community.Description = description.String
+
community.AvatarCID = avatarCID.String
+
community.BannerCID = bannerCID.String
+
community.ModerationType = moderationType.String
+
community.ContentWarnings = contentWarnings
+
community.FederatedFrom = federatedFrom.String
+
community.FederatedID = federatedID.String
+
community.RecordURI = recordURI.String
+
community.RecordCID = recordCID.String
+
if descFacets != nil {
+
community.DescriptionFacets = descFacets
+
}
+
+
result = append(result, community)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, 0, fmt.Errorf("error iterating communities: %w", err)
+
}
+
+
return result, totalCount, nil
+
}
+
+
// Search searches communities by name/description using fuzzy matching
+
func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
+
// Build query with fuzzy search and visibility filter
+
whereClauses := []string{
+
"(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
+
}
+
args := []interface{}{req.Query}
+
argCount := 2
+
+
if req.Visibility != "" {
+
whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
+
args = append(args, req.Visibility)
+
argCount++
+
}
+
+
whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
+
+
// Get total count
+
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
+
var totalCount int
+
err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to count search results: %w", err)
+
}
+
+
// Search with relevance ranking using pg_trgm similarity
+
// Filter out results with very low relevance (< 0.2) to avoid noise
+
query := fmt.Sprintf(`
+
SELECT id, did, handle, name, display_name, description, description_facets,
+
avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
+
visibility, allow_external_discovery, moderation_type, content_warnings,
+
member_count, subscriber_count, post_count,
+
federated_from, federated_id, created_at, updated_at,
+
record_uri, record_cid,
+
similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
+
FROM communities
+
%s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
+
ORDER BY relevance DESC, member_count DESC
+
LIMIT $%d OFFSET $%d`,
+
whereClause, argCount, argCount+1)
+
+
args = append(args, req.Limit, req.Offset)
+
+
rows, err := r.db.QueryContext(ctx, query, args...)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to search communities: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.Community{}
+
for rows.Next() {
+
community := &communities.Community{}
+
var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
+
var federatedFrom, federatedID, recordURI, recordCID sql.NullString
+
var descFacets []byte
+
var contentWarnings []string
+
var relevance float64
+
+
err := rows.Scan(
+
&community.ID, &community.DID, &community.Handle, &community.Name,
+
&displayName, &description, &descFacets,
+
&avatarCID, &bannerCID,
+
&community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
+
&community.Visibility, &community.AllowExternalDiscovery,
+
&moderationType, pq.Array(&contentWarnings),
+
&community.MemberCount, &community.SubscriberCount, &community.PostCount,
+
&federatedFrom, &federatedID,
+
&community.CreatedAt, &community.UpdatedAt,
+
&recordURI, &recordCID,
+
&relevance,
+
)
+
if err != nil {
+
return nil, 0, fmt.Errorf("failed to scan community: %w", err)
+
}
+
+
// Map nullable fields
+
community.DisplayName = displayName.String
+
community.Description = description.String
+
community.AvatarCID = avatarCID.String
+
community.BannerCID = bannerCID.String
+
community.ModerationType = moderationType.String
+
community.ContentWarnings = contentWarnings
+
community.FederatedFrom = federatedFrom.String
+
community.FederatedID = federatedID.String
+
community.RecordURI = recordURI.String
+
community.RecordCID = recordCID.String
+
if descFacets != nil {
+
community.DescriptionFacets = descFacets
+
}
+
+
result = append(result, community)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, 0, fmt.Errorf("error iterating search results: %w", err)
+
}
+
+
return result, totalCount, nil
+
}
+
+
// Helper functions
+
func nullString(s string) sql.NullString {
+
return sql.NullString{String: s, Valid: s != ""}
+
}
+
+
func nullBytes(b []byte) []byte {
+
if b == nil || len(b) == 0 {
+
return nil
+
}
+
return b
+
}
+272
internal/db/postgres/community_repo_memberships.go
···
+
package postgres
+
+
import (
+
"context"
+
"database/sql"
+
"fmt"
+
"strings"
+
+
"Coves/internal/core/communities"
+
)
+
+
// CreateMembership creates a new membership record
+
func (r *postgresCommunityRepo) CreateMembership(ctx context.Context, membership *communities.Membership) (*communities.Membership, error) {
+
query := `
+
INSERT INTO community_memberships (
+
user_did, community_did, reputation_score, contribution_count,
+
joined_at, last_active_at, is_banned, is_moderator
+
)
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+
RETURNING id, joined_at, last_active_at`
+
+
err := r.db.QueryRowContext(ctx, query,
+
membership.UserDID,
+
membership.CommunityDID,
+
membership.ReputationScore,
+
membership.ContributionCount,
+
membership.JoinedAt,
+
membership.LastActiveAt,
+
membership.IsBanned,
+
membership.IsModerator,
+
).Scan(&membership.ID, &membership.JoinedAt, &membership.LastActiveAt)
+
+
if err != nil {
+
if strings.Contains(err.Error(), "duplicate key") {
+
return nil, fmt.Errorf("membership already exists")
+
}
+
if strings.Contains(err.Error(), "foreign key") {
+
return nil, communities.ErrCommunityNotFound
+
}
+
return nil, fmt.Errorf("failed to create membership: %w", err)
+
}
+
+
return membership, nil
+
}
+
+
// GetMembership retrieves a specific membership
+
func (r *postgresCommunityRepo) GetMembership(ctx context.Context, userDID, communityDID string) (*communities.Membership, error) {
+
membership := &communities.Membership{}
+
query := `
+
SELECT id, user_did, community_did, reputation_score, contribution_count,
+
joined_at, last_active_at, is_banned, is_moderator
+
FROM community_memberships
+
WHERE user_did = $1 AND community_did = $2`
+
+
err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
+
&membership.ID,
+
&membership.UserDID,
+
&membership.CommunityDID,
+
&membership.ReputationScore,
+
&membership.ContributionCount,
+
&membership.JoinedAt,
+
&membership.LastActiveAt,
+
&membership.IsBanned,
+
&membership.IsModerator,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrMembershipNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get membership: %w", err)
+
}
+
+
return membership, nil
+
}
+
+
// UpdateMembership updates an existing membership
+
func (r *postgresCommunityRepo) UpdateMembership(ctx context.Context, membership *communities.Membership) (*communities.Membership, error) {
+
query := `
+
UPDATE community_memberships
+
SET reputation_score = $3,
+
contribution_count = $4,
+
last_active_at = $5,
+
is_banned = $6,
+
is_moderator = $7
+
WHERE user_did = $1 AND community_did = $2
+
RETURNING last_active_at`
+
+
err := r.db.QueryRowContext(ctx, query,
+
membership.UserDID,
+
membership.CommunityDID,
+
membership.ReputationScore,
+
membership.ContributionCount,
+
membership.LastActiveAt,
+
membership.IsBanned,
+
membership.IsModerator,
+
).Scan(&membership.LastActiveAt)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrMembershipNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to update membership: %w", err)
+
}
+
+
return membership, nil
+
}
+
+
// ListMembers retrieves members of a community ordered by reputation
+
func (r *postgresCommunityRepo) ListMembers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Membership, error) {
+
query := `
+
SELECT id, user_did, community_did, reputation_score, contribution_count,
+
joined_at, last_active_at, is_banned, is_moderator
+
FROM community_memberships
+
WHERE community_did = $1
+
ORDER BY reputation_score DESC, joined_at ASC
+
LIMIT $2 OFFSET $3`
+
+
rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list members: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.Membership{}
+
for rows.Next() {
+
membership := &communities.Membership{}
+
+
err := rows.Scan(
+
&membership.ID,
+
&membership.UserDID,
+
&membership.CommunityDID,
+
&membership.ReputationScore,
+
&membership.ContributionCount,
+
&membership.JoinedAt,
+
&membership.LastActiveAt,
+
&membership.IsBanned,
+
&membership.IsModerator,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan member: %w", err)
+
}
+
+
result = append(result, membership)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating members: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// CreateModerationAction records a moderation action
+
func (r *postgresCommunityRepo) CreateModerationAction(ctx context.Context, action *communities.ModerationAction) (*communities.ModerationAction, error) {
+
query := `
+
INSERT INTO community_moderation (
+
community_did, action, reason, instance_did, broadcast, created_at, expires_at
+
)
+
VALUES ($1, $2, $3, $4, $5, $6, $7)
+
RETURNING id, created_at`
+
+
err := r.db.QueryRowContext(ctx, query,
+
action.CommunityDID,
+
action.Action,
+
nullString(action.Reason),
+
action.InstanceDID,
+
action.Broadcast,
+
action.CreatedAt,
+
action.ExpiresAt,
+
).Scan(&action.ID, &action.CreatedAt)
+
+
if err != nil {
+
if strings.Contains(err.Error(), "foreign key") {
+
return nil, communities.ErrCommunityNotFound
+
}
+
return nil, fmt.Errorf("failed to create moderation action: %w", err)
+
}
+
+
return action, nil
+
}
+
+
// ListModerationActions retrieves moderation actions for a community
+
func (r *postgresCommunityRepo) ListModerationActions(ctx context.Context, communityDID string, limit, offset int) ([]*communities.ModerationAction, error) {
+
query := `
+
SELECT id, community_did, action, reason, instance_did, broadcast, created_at, expires_at
+
FROM community_moderation
+
WHERE community_did = $1
+
ORDER BY created_at DESC
+
LIMIT $2 OFFSET $3`
+
+
rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list moderation actions: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.ModerationAction{}
+
for rows.Next() {
+
action := &communities.ModerationAction{}
+
var reason sql.NullString
+
+
err := rows.Scan(
+
&action.ID,
+
&action.CommunityDID,
+
&action.Action,
+
&reason,
+
&action.InstanceDID,
+
&action.Broadcast,
+
&action.CreatedAt,
+
&action.ExpiresAt,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan moderation action: %w", err)
+
}
+
+
action.Reason = reason.String
+
result = append(result, action)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating moderation actions: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// Statistics methods
+
func (r *postgresCommunityRepo) IncrementMemberCount(ctx context.Context, communityDID string) error {
+
query := `UPDATE communities SET member_count = member_count + 1 WHERE did = $1`
+
_, err := r.db.ExecContext(ctx, query, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to increment member count: %w", err)
+
}
+
return nil
+
}
+
+
func (r *postgresCommunityRepo) DecrementMemberCount(ctx context.Context, communityDID string) error {
+
query := `UPDATE communities SET member_count = GREATEST(0, member_count - 1) WHERE did = $1`
+
_, err := r.db.ExecContext(ctx, query, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to decrement member count: %w", err)
+
}
+
return nil
+
}
+
+
func (r *postgresCommunityRepo) IncrementSubscriberCount(ctx context.Context, communityDID string) error {
+
query := `UPDATE communities SET subscriber_count = subscriber_count + 1 WHERE did = $1`
+
_, err := r.db.ExecContext(ctx, query, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to increment subscriber count: %w", err)
+
}
+
return nil
+
}
+
+
func (r *postgresCommunityRepo) DecrementSubscriberCount(ctx context.Context, communityDID string) error {
+
query := `UPDATE communities SET subscriber_count = GREATEST(0, subscriber_count - 1) WHERE did = $1`
+
_, err := r.db.ExecContext(ctx, query, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to decrement subscriber count: %w", err)
+
}
+
return nil
+
}
+
+
func (r *postgresCommunityRepo) IncrementPostCount(ctx context.Context, communityDID string) error {
+
query := `UPDATE communities SET post_count = post_count + 1 WHERE did = $1`
+
_, err := r.db.ExecContext(ctx, query, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to increment post count: %w", err)
+
}
+
return nil
+
}
+292
internal/db/postgres/community_repo_subscriptions.go
···
+
package postgres
+
+
import (
+
"context"
+
"database/sql"
+
"fmt"
+
"strings"
+
+
"Coves/internal/core/communities"
+
)
+
+
// Subscribe creates a new subscription record
+
func (r *postgresCommunityRepo) Subscribe(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
+
query := `
+
INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
+
VALUES ($1, $2, $3, $4, $5)
+
RETURNING id, subscribed_at`
+
+
err := r.db.QueryRowContext(ctx, query,
+
subscription.UserDID,
+
subscription.CommunityDID,
+
subscription.SubscribedAt,
+
nullString(subscription.RecordURI),
+
nullString(subscription.RecordCID),
+
).Scan(&subscription.ID, &subscription.SubscribedAt)
+
+
if err != nil {
+
if strings.Contains(err.Error(), "duplicate key") {
+
return nil, communities.ErrSubscriptionAlreadyExists
+
}
+
if strings.Contains(err.Error(), "foreign key") {
+
return nil, communities.ErrCommunityNotFound
+
}
+
return nil, fmt.Errorf("failed to create subscription: %w", err)
+
}
+
+
return subscription, nil
+
}
+
+
// SubscribeWithCount atomically creates subscription and increments subscriber count
+
// This is idempotent - safe for Jetstream replays
+
func (r *postgresCommunityRepo) SubscribeWithCount(ctx context.Context, subscription *communities.Subscription) (*communities.Subscription, error) {
+
tx, err := r.db.BeginTx(ctx, nil)
+
if err != nil {
+
return nil, fmt.Errorf("failed to begin transaction: %w", err)
+
}
+
defer tx.Rollback()
+
+
// Insert subscription with ON CONFLICT DO NOTHING for idempotency
+
query := `
+
INSERT INTO community_subscriptions (user_did, community_did, subscribed_at, record_uri, record_cid)
+
VALUES ($1, $2, $3, $4, $5)
+
ON CONFLICT (user_did, community_did) DO NOTHING
+
RETURNING id, subscribed_at`
+
+
err = tx.QueryRowContext(ctx, query,
+
subscription.UserDID,
+
subscription.CommunityDID,
+
subscription.SubscribedAt,
+
nullString(subscription.RecordURI),
+
nullString(subscription.RecordCID),
+
).Scan(&subscription.ID, &subscription.SubscribedAt)
+
+
// If no rows returned, subscription already existed (idempotent behavior)
+
if err == sql.ErrNoRows {
+
// Get existing subscription
+
query = `SELECT id, subscribed_at FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
+
err = tx.QueryRowContext(ctx, query, subscription.UserDID, subscription.CommunityDID).Scan(&subscription.ID, &subscription.SubscribedAt)
+
if err != nil {
+
return nil, fmt.Errorf("failed to get existing subscription: %w", err)
+
}
+
// Don't increment count - subscription already existed
+
if err := tx.Commit(); err != nil {
+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
return subscription, nil
+
}
+
+
if err != nil {
+
if strings.Contains(err.Error(), "foreign key") {
+
return nil, communities.ErrCommunityNotFound
+
}
+
return nil, fmt.Errorf("failed to create subscription: %w", err)
+
}
+
+
// Increment subscriber count only if insert succeeded
+
incrementQuery := `
+
UPDATE communities
+
SET subscriber_count = subscriber_count + 1, updated_at = NOW()
+
WHERE did = $1`
+
+
_, err = tx.ExecContext(ctx, incrementQuery, subscription.CommunityDID)
+
if err != nil {
+
return nil, fmt.Errorf("failed to increment subscriber count: %w", err)
+
}
+
+
if err := tx.Commit(); err != nil {
+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
+
return subscription, nil
+
}
+
+
// Unsubscribe removes a subscription record
+
func (r *postgresCommunityRepo) Unsubscribe(ctx context.Context, userDID, communityDID string) error {
+
query := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
+
+
result, err := r.db.ExecContext(ctx, query, userDID, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to unsubscribe: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check unsubscribe result: %w", err)
+
}
+
+
if rowsAffected == 0 {
+
return communities.ErrSubscriptionNotFound
+
}
+
+
return nil
+
}
+
+
// UnsubscribeWithCount atomically removes subscription and decrements subscriber count
+
// This is idempotent - safe for Jetstream replays
+
func (r *postgresCommunityRepo) UnsubscribeWithCount(ctx context.Context, userDID, communityDID string) error {
+
tx, err := r.db.BeginTx(ctx, nil)
+
if err != nil {
+
return fmt.Errorf("failed to begin transaction: %w", err)
+
}
+
defer tx.Rollback()
+
+
// Delete subscription
+
deleteQuery := `DELETE FROM community_subscriptions WHERE user_did = $1 AND community_did = $2`
+
result, err := tx.ExecContext(ctx, deleteQuery, userDID, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to unsubscribe: %w", err)
+
}
+
+
rowsAffected, err := result.RowsAffected()
+
if err != nil {
+
return fmt.Errorf("failed to check unsubscribe result: %w", err)
+
}
+
+
// If no rows deleted, subscription didn't exist (idempotent - not an error)
+
if rowsAffected == 0 {
+
if err := tx.Commit(); err != nil {
+
return fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
return nil
+
}
+
+
// Decrement subscriber count only if delete succeeded
+
decrementQuery := `
+
UPDATE communities
+
SET subscriber_count = GREATEST(0, subscriber_count - 1), updated_at = NOW()
+
WHERE did = $1`
+
+
_, err = tx.ExecContext(ctx, decrementQuery, communityDID)
+
if err != nil {
+
return fmt.Errorf("failed to decrement subscriber count: %w", err)
+
}
+
+
if err := tx.Commit(); err != nil {
+
return fmt.Errorf("failed to commit transaction: %w", err)
+
}
+
+
return nil
+
}
+
+
// GetSubscription retrieves a specific subscription
+
func (r *postgresCommunityRepo) GetSubscription(ctx context.Context, userDID, communityDID string) (*communities.Subscription, error) {
+
subscription := &communities.Subscription{}
+
query := `
+
SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
+
FROM community_subscriptions
+
WHERE user_did = $1 AND community_did = $2`
+
+
var recordURI, recordCID sql.NullString
+
+
err := r.db.QueryRowContext(ctx, query, userDID, communityDID).Scan(
+
&subscription.ID,
+
&subscription.UserDID,
+
&subscription.CommunityDID,
+
&subscription.SubscribedAt,
+
&recordURI,
+
&recordCID,
+
)
+
+
if err == sql.ErrNoRows {
+
return nil, communities.ErrSubscriptionNotFound
+
}
+
if err != nil {
+
return nil, fmt.Errorf("failed to get subscription: %w", err)
+
}
+
+
subscription.RecordURI = recordURI.String
+
subscription.RecordCID = recordCID.String
+
+
return subscription, nil
+
}
+
+
// ListSubscriptions retrieves all subscriptions for a user
+
func (r *postgresCommunityRepo) ListSubscriptions(ctx context.Context, userDID string, limit, offset int) ([]*communities.Subscription, error) {
+
query := `
+
SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
+
FROM community_subscriptions
+
WHERE user_did = $1
+
ORDER BY subscribed_at DESC
+
LIMIT $2 OFFSET $3`
+
+
rows, err := r.db.QueryContext(ctx, query, userDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.Subscription{}
+
for rows.Next() {
+
subscription := &communities.Subscription{}
+
var recordURI, recordCID sql.NullString
+
+
err := rows.Scan(
+
&subscription.ID,
+
&subscription.UserDID,
+
&subscription.CommunityDID,
+
&subscription.SubscribedAt,
+
&recordURI,
+
&recordCID,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan subscription: %w", err)
+
}
+
+
subscription.RecordURI = recordURI.String
+
subscription.RecordCID = recordCID.String
+
+
result = append(result, subscription)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating subscriptions: %w", err)
+
}
+
+
return result, nil
+
}
+
+
// ListSubscribers retrieves all subscribers for a community
+
func (r *postgresCommunityRepo) ListSubscribers(ctx context.Context, communityDID string, limit, offset int) ([]*communities.Subscription, error) {
+
query := `
+
SELECT id, user_did, community_did, subscribed_at, record_uri, record_cid
+
FROM community_subscriptions
+
WHERE community_did = $1
+
ORDER BY subscribed_at DESC
+
LIMIT $2 OFFSET $3`
+
+
rows, err := r.db.QueryContext(ctx, query, communityDID, limit, offset)
+
if err != nil {
+
return nil, fmt.Errorf("failed to list subscribers: %w", err)
+
}
+
defer rows.Close()
+
+
result := []*communities.Subscription{}
+
for rows.Next() {
+
subscription := &communities.Subscription{}
+
var recordURI, recordCID sql.NullString
+
+
err := rows.Scan(
+
&subscription.ID,
+
&subscription.UserDID,
+
&subscription.CommunityDID,
+
&subscription.SubscribedAt,
+
&recordURI,
+
&recordCID,
+
)
+
if err != nil {
+
return nil, fmt.Errorf("failed to scan subscriber: %w", err)
+
}
+
+
subscription.RecordURI = recordURI.String
+
subscription.RecordCID = recordCID.String
+
+
result = append(result, subscription)
+
}
+
+
if err = rows.Err(); err != nil {
+
return nil, fmt.Errorf("error iterating subscribers: %w", err)
+
}
+
+
return result, nil
+
}