A community based topic aggregation platform built on atproto

Upgrading error handling / gorm column handling / indexes

Changed files
+167 -81
cmd
server
internal
+5 -5
cmd/server/main.go
···
}
r := chi.NewRouter()
-
+
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(middleware.RequestID)
···
Conn: db,
}), &gorm.Config{
DisableForeignKeyConstraintWhenMigrating: true,
-
PrepareStmt: false,
+
PrepareStmt: true, // Enable prepared statements for better performance
})
if err != nil {
log.Fatal("Failed to initialize GORM:", err)
···
// Initialize repositories
userRepo := postgresRepo.NewUserRepository(db)
_ = users.NewUserService(userRepo) // TODO: Use when UserRoutes is fixed
-
+
// Initialize carstore for ATProto repository storage
carDirs := []string{"./data/carstore"}
repoStore, err := carstore.NewRepoStore(gormDB, carDirs)
if err != nil {
log.Fatal("Failed to initialize repo store:", err)
}
-
+
repositoryRepo := postgresRepo.NewRepositoryRepo(db)
repositoryService := repository.NewService(repositoryRepo, repoStore)
···
fmt.Printf("Server starting on port %s\n", port)
log.Fatal(http.ListenAndServe(":"+port, r))
-
}
+
}
+38 -10
internal/atproto/carstore/carstore.go
···
// Initialize Indigo's carstore
cs, err := carstore.NewCarStore(db, carDirs)
if err != nil {
-
return nil, fmt.Errorf("failed to create carstore: %w", err)
+
return nil, fmt.Errorf("initializing carstore: %w", err)
}
return &CarStore{
···
// ImportSlice imports a CAR file slice for a user
func (c *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carData []byte) (cid.Cid, error) {
rootCid, _, err := c.cs.ImportSlice(ctx, uid, since, carData)
-
return rootCid, err
+
if err != nil {
+
return cid.Undef, fmt.Errorf("importing CAR slice for UID %d: %w", uid, err)
+
}
+
return rootCid, nil
}
// ReadUserCar reads a user's repository CAR file
func (c *CarStore) ReadUserCar(ctx context.Context, uid models.Uid, sinceRev string, incremental bool, w io.Writer) error {
-
return c.cs.ReadUserCar(ctx, uid, sinceRev, incremental, w)
+
if err := c.cs.ReadUserCar(ctx, uid, sinceRev, incremental, w); err != nil {
+
return fmt.Errorf("reading user CAR for UID %d: %w", uid, err)
+
}
+
return nil
}
// GetUserRepoHead gets the latest repository head CID for a user
func (c *CarStore) GetUserRepoHead(ctx context.Context, uid models.Uid) (cid.Cid, error) {
-
return c.cs.GetUserRepoHead(ctx, uid)
+
head, err := c.cs.GetUserRepoHead(ctx, uid)
+
if err != nil {
+
return cid.Undef, fmt.Errorf("getting repo head for UID %d: %w", uid, err)
+
}
+
return head, nil
}
// CompactUserShards performs garbage collection and compaction for a user's data
func (c *CarStore) CompactUserShards(ctx context.Context, uid models.Uid, aggressive bool) error {
_, err := c.cs.CompactUserShards(ctx, uid, aggressive)
-
return err
+
if err != nil {
+
return fmt.Errorf("compacting shards for UID %d: %w", uid, err)
+
}
+
return nil
}
// WipeUserData removes all data for a user
func (c *CarStore) WipeUserData(ctx context.Context, uid models.Uid) error {
-
return c.cs.WipeUserData(ctx, uid)
+
if err := c.cs.WipeUserData(ctx, uid); err != nil {
+
return fmt.Errorf("wiping data for UID %d: %w", uid, err)
+
}
+
return nil
}
// NewDeltaSession creates a new session for writing deltas
func (c *CarStore) NewDeltaSession(ctx context.Context, uid models.Uid, since *string) (*carstore.DeltaSession, error) {
-
return c.cs.NewDeltaSession(ctx, uid, since)
+
session, err := c.cs.NewDeltaSession(ctx, uid, since)
+
if err != nil {
+
return nil, fmt.Errorf("creating delta session for UID %d: %w", uid, err)
+
}
+
return session, nil
}
// ReadOnlySession creates a read-only session for reading user data
func (c *CarStore) ReadOnlySession(uid models.Uid) (*carstore.DeltaSession, error) {
-
return c.cs.ReadOnlySession(uid)
+
session, err := c.cs.ReadOnlySession(uid)
+
if err != nil {
+
return nil, fmt.Errorf("creating read-only session for UID %d: %w", uid, err)
+
}
+
return session, nil
}
// Stat returns statistics about the carstore
func (c *CarStore) Stat(ctx context.Context, uid models.Uid) ([]carstore.UserStat, error) {
-
return c.cs.Stat(ctx, uid)
-
}
+
stats, err := c.cs.Stat(ctx, uid)
+
if err != nil {
+
return nil, fmt.Errorf("getting stats for UID %d: %w", uid, err)
+
}
+
return stats, nil
+
}
+11 -11
internal/atproto/carstore/repo_store.go
···
// Create carstore
cs, err := NewCarStore(db, carDirs)
if err != nil {
-
return nil, fmt.Errorf("failed to create carstore: %w", err)
+
return nil, fmt.Errorf("creating carstore: %w", err)
}
// Create user mapping
mapping, err := NewUserMapping(db)
if err != nil {
-
return nil, fmt.Errorf("failed to create user mapping: %w", err)
+
return nil, fmt.Errorf("creating user mapping: %w", err)
}
return &RepoStore{
···
func (rs *RepoStore) ImportRepo(ctx context.Context, did string, carData io.Reader) (cid.Cid, error) {
uid, err := rs.mapping.GetOrCreateUID(ctx, did)
if err != nil {
-
return cid.Undef, fmt.Errorf("failed to get UID for DID %s: %w", did, err)
+
return cid.Undef, fmt.Errorf("getting UID for DID %s: %w", did, err)
}
// Read all data from the reader
data, err := io.ReadAll(carData)
if err != nil {
-
return cid.Undef, fmt.Errorf("failed to read CAR data: %w", err)
+
return cid.Undef, fmt.Errorf("reading CAR data: %w", err)
}
-
+
return rs.cs.ImportSlice(ctx, uid, nil, data)
}
···
func (rs *RepoStore) ReadRepo(ctx context.Context, did string, sinceRev string) ([]byte, error) {
uid, err := rs.mapping.GetUID(did)
if err != nil {
-
return nil, fmt.Errorf("failed to get UID for DID %s: %w", did, err)
+
return nil, fmt.Errorf("getting UID for DID %s: %w", did, err)
}
var buf bytes.Buffer
err = rs.cs.ReadUserCar(ctx, uid, sinceRev, false, &buf)
if err != nil {
-
return nil, fmt.Errorf("failed to read repo for DID %s: %w", did, err)
+
return nil, fmt.Errorf("reading repo for DID %s: %w", did, err)
}
return buf.Bytes(), nil
···
func (rs *RepoStore) GetRepoHead(ctx context.Context, did string) (cid.Cid, error) {
uid, err := rs.mapping.GetUID(did)
if err != nil {
-
return cid.Undef, fmt.Errorf("failed to get UID for DID %s: %w", did, err)
+
return cid.Undef, fmt.Errorf("getting UID for DID %s: %w", did, err)
}
return rs.cs.GetUserRepoHead(ctx, uid)
···
func (rs *RepoStore) CompactRepo(ctx context.Context, did string) error {
uid, err := rs.mapping.GetUID(did)
if err != nil {
-
return fmt.Errorf("failed to get UID for DID %s: %w", did, err)
+
return fmt.Errorf("getting UID for DID %s: %w", did, err)
}
return rs.cs.CompactUserShards(ctx, uid, false)
···
func (rs *RepoStore) DeleteRepo(ctx context.Context, did string) error {
uid, err := rs.mapping.GetUID(did)
if err != nil {
-
return fmt.Errorf("failed to get UID for DID %s: %w", did, err)
+
return fmt.Errorf("getting UID for DID %s: %w", did, err)
}
return rs.cs.WipeUserData(ctx, uid)
···
// GetOrCreateUID gets or creates a UID for a DID
func (rs *RepoStore) GetOrCreateUID(ctx context.Context, did string) (models.Uid, error) {
return rs.mapping.GetOrCreateUID(ctx, did)
-
}
+
}
+11 -11
internal/atproto/carstore/user_mapping.go
···
// UserMapping manages the mapping between DIDs and numeric UIDs required by Indigo's carstore
type UserMapping struct {
-
db *gorm.DB
-
mu sync.RWMutex
-
didToUID map[string]models.Uid
-
uidToDID map[models.Uid]string
-
nextUID models.Uid
+
db *gorm.DB
+
mu sync.RWMutex
+
didToUID map[string]models.Uid
+
uidToDID map[models.Uid]string
+
nextUID models.Uid
}
// UserMap represents the database model for DID to UID mapping
type UserMap struct {
UID models.Uid `gorm:"primaryKey;autoIncrement"`
-
DID string `gorm:"uniqueIndex;not null"`
+
DID string `gorm:"column:did;uniqueIndex;not null"`
CreatedAt int64
UpdatedAt int64
}
···
func NewUserMapping(db *gorm.DB) (*UserMapping, error) {
// Auto-migrate the user mapping table
if err := db.AutoMigrate(&UserMap{}); err != nil {
-
return nil, fmt.Errorf("failed to migrate user mapping table: %w", err)
+
return nil, fmt.Errorf("migrating user mapping table: %w", err)
}
um := &UserMapping{
···
// Load existing mappings
if err := um.loadMappings(); err != nil {
-
return nil, fmt.Errorf("failed to load user mappings: %w", err)
+
return nil, fmt.Errorf("loading user mappings: %w", err)
}
return um, nil
···
func (um *UserMapping) loadMappings() error {
var mappings []UserMap
if err := um.db.Find(&mappings).Error; err != nil {
-
return err
+
return fmt.Errorf("querying user mappings: %w", err)
}
um.mu.Lock()
···
}
if err := um.db.Create(userMap).Error; err != nil {
-
return 0, fmt.Errorf("failed to create user mapping: %w", err)
+
return 0, fmt.Errorf("creating user mapping for DID %s: %w", did, err)
}
um.didToUID[did] = userMap.UID
···
return "", fmt.Errorf("DID not found for UID: %d", uid)
}
return did, nil
-
}
+
}
+20
internal/core/repository/constants.go
···
+
package repository
+
+
import (
+
"github.com/ipfs/go-cid"
+
"github.com/multiformats/go-multihash"
+
)
+
+
var (
+
// PlaceholderCID is used for empty repositories that have no content yet
+
// This allows us to maintain consistency in the repository record
+
// while the actual CAR data is created when records are added
+
PlaceholderCID cid.Cid
+
)
+
+
func init() {
+
// Initialize the placeholder CID once at startup
+
emptyData := []byte("empty")
+
mh, _ := multihash.Sum(emptyData, multihash.SHA2_256, -1)
+
PlaceholderCID = cid.NewCidV1(cid.Raw, mh)
+
}
+28 -35
internal/core/repository/service.go
···
"Coves/internal/atproto/carstore"
"github.com/ipfs/go-cid"
-
"github.com/multiformats/go-multihash"
)
// Service implements the RepositoryService interface using Indigo's carstore
type Service struct {
-
repo RepositoryRepository
-
repoStore *carstore.RepoStore
-
signingKeys map[string]interface{} // DID -> signing key
+
repo RepositoryRepository
+
repoStore *carstore.RepoStore
+
signingKeys map[string]interface{} // DID -> signing key
}
// NewService creates a new repository service using carstore
···
// Check if repository already exists
existing, err := s.repo.GetByDID(did)
if err != nil {
-
return nil, fmt.Errorf("failed to check existing repository: %w", err)
+
return nil, fmt.Errorf("checking existing repository: %w", err)
}
if existing != nil {
return nil, fmt.Errorf("repository already exists for DID: %s", did)
···
// For now, just create the user mapping without importing CAR data
// The actual repository data will be created when records are added
ctx := context.Background()
-
+
// Ensure user mapping exists
_, err = s.repoStore.GetOrCreateUID(ctx, did)
if err != nil {
-
return nil, fmt.Errorf("failed to create user mapping: %w", err)
+
return nil, fmt.Errorf("creating user mapping: %w", err)
}
-
-
// Create a placeholder CID for the empty repository
-
emptyData := []byte("empty")
-
mh, _ := multihash.Sum(emptyData, multihash.SHA2_256, -1)
-
placeholderCID := cid.NewCidV1(cid.Raw, mh)
+
// Use placeholder CID for the empty repository
// Create repository record
repository := &Repository{
DID: did,
-
HeadCID: placeholderCID,
+
HeadCID: PlaceholderCID,
Revision: "rev-0",
RecordCount: 0,
StorageSize: 0,
···
// Save to database
if err := s.repo.Create(repository); err != nil {
-
return nil, fmt.Errorf("failed to save repository: %w", err)
+
return nil, fmt.Errorf("saving repository: %w", err)
}
return repository, nil
···
func (s *Service) GetRepository(did string) (*Repository, error) {
repo, err := s.repo.GetByDID(did)
if err != nil {
-
return nil, fmt.Errorf("failed to get repository: %w", err)
+
return nil, fmt.Errorf("getting repository: %w", err)
}
if repo == nil {
return nil, fmt.Errorf("repository not found for DID: %s", did)
···
func (s *Service) DeleteRepository(did string) error {
// Delete from carstore
if err := s.repoStore.DeleteRepo(context.Background(), did); err != nil {
-
return fmt.Errorf("failed to delete repo from carstore: %w", err)
+
return fmt.Errorf("deleting repo from carstore: %w", err)
}
// Delete from database
if err := s.repo.Delete(did); err != nil {
-
return fmt.Errorf("failed to delete repository: %w", err)
+
return fmt.Errorf("deleting repository from database: %w", err)
}
return nil
···
// First check if repository exists in our database
repo, err := s.repo.GetByDID(did)
if err != nil {
-
return nil, fmt.Errorf("failed to get repository: %w", err)
+
return nil, fmt.Errorf("getting repository: %w", err)
}
if repo == nil {
return nil, fmt.Errorf("repository not found for DID: %s", did)
···
// Check for the specific error pattern from Indigo's carstore
errMsg := err.Error()
if strings.Contains(errMsg, "no data found for user") ||
-
strings.Contains(errMsg, "user not found") {
+
strings.Contains(errMsg, "user not found") {
return []byte{}, nil
}
-
return nil, fmt.Errorf("failed to export repository: %w", err)
+
return nil, fmt.Errorf("exporting repository: %w", err)
}
return carData, nil
···
// ImportRepository imports a repository from a CAR file
func (s *Service) ImportRepository(did string, carData []byte) error {
ctx := context.Background()
-
+
// If empty CAR data, just create user mapping
if len(carData) == 0 {
_, err := s.repoStore.GetOrCreateUID(ctx, did)
if err != nil {
-
return fmt.Errorf("failed to create user mapping: %w", err)
+
return fmt.Errorf("creating user mapping: %w", err)
}
-
-
// Create placeholder CID
-
emptyData := []byte("empty")
-
mh, _ := multihash.Sum(emptyData, multihash.SHA2_256, -1)
-
headCID := cid.NewCidV1(cid.Raw, mh)
-
+
+
// Use placeholder CID for empty repository
+
headCID := PlaceholderCID
+
// Create repository record
repo := &Repository{
DID: did,
···
UpdatedAt: time.Now(),
}
if err := s.repo.Create(repo); err != nil {
-
return fmt.Errorf("failed to create repository: %w", err)
+
return fmt.Errorf("creating repository: %w", err)
}
return nil
}
-
+
// Import non-empty CAR into carstore
headCID, err := s.repoStore.ImportRepo(ctx, did, bytes.NewReader(carData))
if err != nil {
-
return fmt.Errorf("failed to import repository: %w", err)
+
return fmt.Errorf("importing repository: %w", err)
}
// Create or update repository record
repo, err := s.repo.GetByDID(did)
if err != nil {
-
return fmt.Errorf("failed to get repository: %w", err)
+
return fmt.Errorf("getting repository: %w", err)
}
if repo == nil {
···
UpdatedAt: time.Now(),
}
if err := s.repo.Create(repo); err != nil {
-
return fmt.Errorf("failed to create repository: %w", err)
+
return fmt.Errorf("creating repository: %w", err)
}
} else {
// Update existing repository
repo.HeadCID = headCID
repo.UpdatedAt = time.Now()
if err := s.repo.Update(repo); err != nil {
-
return fmt.Errorf("failed to update repository: %w", err)
+
return fmt.Errorf("updating repository: %w", err)
}
}
···
func (s *Service) ListCommits(did string, limit int, cursor string) ([]*Commit, string, error) {
return nil, "", fmt.Errorf("commit operations not yet implemented for carstore")
-
}
+
}
+17 -9
internal/core/repository/service_test.go
···
"Coves/internal/atproto/carstore"
"Coves/internal/core/repository"
"Coves/internal/db/postgres"
-
+
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"github.com/pressly/goose/v3"
···
// Connect with GORM using a fresh connection
gormDB, err := gorm.Open(postgresDriver.Open(dbURL), &gorm.Config{
DisableForeignKeyConstraintWhenMigrating: true,
-
PrepareStmt: false,
+
PrepareStmt: false,
})
if err != nil {
t.Fatalf("Failed to create GORM connection: %v", err)
···
gormDB.Exec("DELETE FROM records")
gormDB.Exec("DELETE FROM user_maps")
gormDB.Exec("DELETE FROM car_shards")
+
gormDB.Exec("DELETE FROM block_refs")
+
+
// Close GORM connection
+
if sqlGormDB, err := gormDB.DB(); err == nil {
+
sqlGormDB.Close()
+
}
+
+
// Close original SQL connection
sqlDB.Close()
}
···
// Test DID
testDID := "did:plc:testuser123"
-
+
// Set signing key
service.SetSigningKey(testDID, &mockSigningKey{})
···
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir)
-
+
// Log the temp directory for debugging
t.Logf("Using carstore directory: %s", tempDir)
···
t.Fatalf("Failed to create repository 1: %v", err)
}
t.Logf("Created repository with HeadCID: %s", repo1.HeadCID)
-
+
// Check what's in the database
var userMapCount int
gormDB.Raw("SELECT COUNT(*) FROM user_maps").Scan(&userMapCount)
t.Logf("User maps count: %d", userMapCount)
-
+
var carShardCount int
gormDB.Raw("SELECT COUNT(*) FROM car_shards").Scan(&carShardCount)
t.Logf("Car shards count: %d", carShardCount)
-
+
// Check block_refs too
var blockRefCount int
gormDB.Raw("SELECT COUNT(*) FROM block_refs").Scan(&blockRefCount)
···
func TestRepositoryService_MockedComponents(t *testing.T) {
// Use the existing mock repository from the old test file
_ = NewMockRepositoryRepository()
-
+
// For unit testing without real carstore, we would need to mock RepoStore
// For now, this demonstrates the structure
t.Skip("Mocked carstore tests would require creating mock RepoStore interface")
···
}
return records[start:end], nil
-
}
+
}
+5
internal/db/migrations/003_update_for_carstore.sql
···
-- +goose Up
-- +goose StatementBegin
+
-- WARNING: This migration removes blob storage tables.
+
-- Ensure all blob data has been migrated to carstore before running this migration.
+
-- This migration is NOT reversible if blob data exists!
+
-- Remove the value column from records table since blocks are now stored in filesystem
ALTER TABLE records DROP COLUMN IF EXISTS value;
-- Drop blob-related tables since FileCarStore handles block storage
+
-- WARNING: This will permanently delete all blob data!
DROP TABLE IF EXISTS blob_refs;
DROP TABLE IF EXISTS blobs;
+32
internal/db/migrations/005_add_user_maps_indices.sql
···
+
-- +goose Up
+
-- +goose StatementBegin
+
+
-- Note: The user_maps table is created by GORM's AutoMigrate in the carstore package
+
-- Only add indices if the table exists
+
DO $$
+
BEGIN
+
-- Check if user_maps table exists
+
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'user_maps') THEN
+
-- Check if column exists before creating index
+
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'user_maps' AND column_name = 'did') THEN
+
-- Explicit column name specified in GORM tag
+
CREATE INDEX IF NOT EXISTS idx_user_maps_did ON user_maps(did);
+
END IF;
+
+
-- Add index on created_at if column exists
+
IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'user_maps' AND column_name = 'created_at') THEN
+
CREATE INDEX IF NOT EXISTS idx_user_maps_created_at ON user_maps(created_at);
+
END IF;
+
END IF;
+
END $$;
+
+
-- +goose StatementEnd
+
+
-- +goose Down
+
-- +goose StatementBegin
+
+
-- Remove indices if they exist
+
DROP INDEX IF EXISTS idx_user_maps_did;
+
DROP INDEX IF EXISTS idx_user_maps_created_at;
+
+
-- +goose StatementEnd