An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+2 -10
.github/workflows/docker-image.yml
···
push:
branches:
- main
-
tags:
-
- 'v*'
env:
REGISTRY: ghcr.io
···
steps:
- name: Checkout repository
uses: actions/checkout@v4
-
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Log in to the Container registry
-
uses: docker/login-action@v3
+
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
-
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
···
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
-
type=raw,value=latest,enable={{is_default_branch}}
type=sha
type=sha,format=long
-
type=semver,pattern={{version}}
-
type=semver,pattern={{major}}.{{minor}}
-
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
- name: Build and push Docker image
id: push
-
uses: docker/build-push-action@v6
+
uses: docker/build-push-action@v5
with:
context: .
push: true
-10
Caddyfile.postgres
···
-
{$COCOON_HOSTNAME} {
-
reverse_proxy cocoon:8080
-
-
encode gzip
-
-
log {
-
output file /data/access.log
-
format json
-
}
-
}
+1 -44
README.md
···
docker-compose up -d
```
-
**For PostgreSQL deployment:**
-
```bash
-
# Add POSTGRES_PASSWORD to your .env file first!
-
docker-compose -f docker-compose.postgres.yaml up -d
-
```
-
5. **Get your invite code**
On first run, an invite code is automatically created. View it with:
···
### Optional Configuration
-
#### Database Configuration
-
-
By default, Cocoon uses SQLite which requires no additional setup. For production deployments with higher traffic, you can use PostgreSQL:
-
-
```bash
-
# Database type: sqlite (default) or postgres
-
COCOON_DB_TYPE="postgres"
-
-
# PostgreSQL connection string (required if db-type is postgres)
-
# Format: postgres://user:password@host:port/database?sslmode=disable
-
COCOON_DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
-
-
# Or use the standard DATABASE_URL environment variable
-
DATABASE_URL="postgres://cocoon:password@localhost:5432/cocoon?sslmode=disable"
-
```
-
-
For SQLite (default):
-
```bash
-
COCOON_DB_TYPE="sqlite"
-
COCOON_DB_NAME="/data/cocoon/cocoon.db"
-
```
-
-
> **Note**: When using PostgreSQL, database backups to S3 are not handled by Cocoon. Use `pg_dump` or your database provider's backup solution instead.
-
#### SMTP Email Settings
```bash
COCOON_SMTP_USER="your-smtp-username"
···
```
#### S3 Storage
-
-
Cocoon supports S3-compatible storage for both database backups (SQLite only) and blob storage (images, videos, etc.):
-
```bash
-
# Enable S3 backups (SQLite databases only - hourly backups)
COCOON_S3_BACKUPS_ENABLED=true
-
-
# Enable S3 for blob storage (images, videos, etc.)
-
# When enabled, blobs are stored in S3 instead of the database
COCOON_S3_BLOBSTORE_ENABLED=true
-
-
# S3 configuration (works with AWS S3, MinIO, Cloudflare R2, etc.)
COCOON_S3_REGION="us-east-1"
COCOON_S3_BUCKET="your-bucket"
COCOON_S3_ENDPOINT="https://s3.amazonaws.com"
COCOON_S3_ACCESS_KEY="your-access-key"
COCOON_S3_SECRET_KEY="your-secret-key"
```
-
-
**Blob Storage Options:**
-
- `COCOON_S3_BLOBSTORE_ENABLED=false` (default): Blobs stored in the database
-
- `COCOON_S3_BLOBSTORE_ENABLED=true`: Blobs stored in S3 bucket under `blobs/{did}/{cid}`
### Management Commands
···
- [x] `com.atproto.repo.getRecord`
- [x] `com.atproto.repo.importRepo` (Works "okay". Use with extreme caution.)
- [x] `com.atproto.repo.listRecords`
-
- [x] `com.atproto.repo.listMissingBlobs` (Not actually functional, but will return a response as if no blobs were missing)
+
- [ ] `com.atproto.repo.listMissingBlobs`
### Server
+4 -40
cmd/cocoon/main.go
···
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/urfave/cli/v2"
"golang.org/x/crypto/bcrypt"
-
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
···
EnvVars: []string{"COCOON_DB_NAME"},
},
&cli.StringFlag{
-
Name: "db-type",
-
Value: "sqlite",
-
Usage: "Database type: sqlite or postgres",
-
EnvVars: []string{"COCOON_DB_TYPE"},
-
},
-
&cli.StringFlag{
-
Name: "database-url",
-
Aliases: []string{"db-url"},
-
Usage: "PostgreSQL connection string (required if db-type is postgres)",
-
EnvVars: []string{"COCOON_DATABASE_URL", "DATABASE_URL"},
-
},
-
&cli.StringFlag{
Name: "did",
EnvVars: []string{"COCOON_DID"},
},
···
s, err := server.New(&server.Args{
Addr: cmd.String("addr"),
DbName: cmd.String("db-name"),
-
DbType: cmd.String("db-type"),
-
DatabaseURL: cmd.String("database-url"),
Did: cmd.String("did"),
Hostname: cmd.String("hostname"),
RotationKeyPath: cmd.String("rotation-key-path"),
···
},
},
Action: func(cmd *cli.Context) error {
-
db, err := newDb(cmd)
+
db, err := newDb()
if err != nil {
return err
}
···
},
},
Action: func(cmd *cli.Context) error {
-
db, err := newDb(cmd)
+
db, err := newDb()
if err != nil {
return err
}
···
},
}
-
func newDb(cmd *cli.Context) (*gorm.DB, error) {
-
dbType := cmd.String("db-type")
-
if dbType == "" {
-
dbType = "sqlite"
-
}
-
-
switch dbType {
-
case "postgres":
-
databaseURL := cmd.String("database-url")
-
if databaseURL == "" {
-
databaseURL = cmd.String("database-url")
-
}
-
if databaseURL == "" {
-
return nil, fmt.Errorf("COCOON_DATABASE_URL or DATABASE_URL must be set when using postgres")
-
}
-
return gorm.Open(postgres.Open(databaseURL), &gorm.Config{})
-
default:
-
dbName := cmd.String("db-name")
-
if dbName == "" {
-
dbName = "cocoon.db"
-
}
-
return gorm.Open(sqlite.Open(dbName), &gorm.Config{})
-
}
+
func newDb() (*gorm.DB, error) {
+
return gorm.Open(sqlite.Open("cocoon.db"), &gorm.Config{})
}
-158
docker-compose.postgres.yaml
···
-
# Docker Compose with PostgreSQL
-
#
-
# Usage:
-
# docker-compose -f docker-compose.postgres.yaml up -d
-
#
-
# This file extends the base docker-compose.yaml with a PostgreSQL database.
-
# Set the following in your .env file:
-
# COCOON_DB_TYPE=postgres
-
# POSTGRES_PASSWORD=your-secure-password
-
-
version: '3.8'
-
-
services:
-
postgres:
-
image: postgres:16-alpine
-
container_name: cocoon-postgres
-
environment:
-
POSTGRES_USER: cocoon
-
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD is required}
-
POSTGRES_DB: cocoon
-
volumes:
-
- postgres_data:/var/lib/postgresql/data
-
healthcheck:
-
test: ["CMD-SHELL", "pg_isready -U cocoon -d cocoon"]
-
interval: 10s
-
timeout: 5s
-
retries: 5
-
restart: unless-stopped
-
-
init-keys:
-
build:
-
context: .
-
dockerfile: Dockerfile
-
image: ghcr.io/haileyok/cocoon:latest
-
container_name: cocoon-init-keys
-
volumes:
-
- ./keys:/keys
-
- ./data:/data/cocoon
-
- ./init-keys.sh:/init-keys.sh:ro
-
environment:
-
COCOON_DID: ${COCOON_DID}
-
COCOON_HOSTNAME: ${COCOON_HOSTNAME}
-
COCOON_ROTATION_KEY_PATH: /keys/rotation.key
-
COCOON_JWK_PATH: /keys/jwk.key
-
COCOON_CONTACT_EMAIL: ${COCOON_CONTACT_EMAIL}
-
COCOON_RELAYS: ${COCOON_RELAYS:-https://bsky.network}
-
COCOON_ADMIN_PASSWORD: ${COCOON_ADMIN_PASSWORD}
-
entrypoint: ["/bin/sh", "/init-keys.sh"]
-
restart: "no"
-
-
cocoon:
-
build:
-
context: .
-
dockerfile: Dockerfile
-
image: ghcr.io/haileyok/cocoon:latest
-
container_name: cocoon-pds
-
depends_on:
-
init-keys:
-
condition: service_completed_successfully
-
postgres:
-
condition: service_healthy
-
ports:
-
- "8080:8080"
-
volumes:
-
- ./data:/data/cocoon
-
- ./keys/rotation.key:/keys/rotation.key:ro
-
- ./keys/jwk.key:/keys/jwk.key:ro
-
environment:
-
# Required settings
-
COCOON_DID: ${COCOON_DID}
-
COCOON_HOSTNAME: ${COCOON_HOSTNAME}
-
COCOON_ROTATION_KEY_PATH: /keys/rotation.key
-
COCOON_JWK_PATH: /keys/jwk.key
-
COCOON_CONTACT_EMAIL: ${COCOON_CONTACT_EMAIL}
-
COCOON_RELAYS: ${COCOON_RELAYS:-https://bsky.network}
-
COCOON_ADMIN_PASSWORD: ${COCOON_ADMIN_PASSWORD}
-
COCOON_SESSION_SECRET: ${COCOON_SESSION_SECRET}
-
-
# Database configuration - PostgreSQL
-
COCOON_ADDR: ":8080"
-
COCOON_DB_TYPE: postgres
-
COCOON_DATABASE_URL: postgres://cocoon:${POSTGRES_PASSWORD}@postgres:5432/cocoon?sslmode=disable
-
COCOON_BLOCKSTORE_VARIANT: ${COCOON_BLOCKSTORE_VARIANT:-sqlite}
-
-
# Optional: SMTP settings for email
-
COCOON_SMTP_USER: ${COCOON_SMTP_USER:-}
-
COCOON_SMTP_PASS: ${COCOON_SMTP_PASS:-}
-
COCOON_SMTP_HOST: ${COCOON_SMTP_HOST:-}
-
COCOON_SMTP_PORT: ${COCOON_SMTP_PORT:-}
-
COCOON_SMTP_EMAIL: ${COCOON_SMTP_EMAIL:-}
-
COCOON_SMTP_NAME: ${COCOON_SMTP_NAME:-}
-
-
# Optional: S3 configuration
-
COCOON_S3_BACKUPS_ENABLED: ${COCOON_S3_BACKUPS_ENABLED:-false}
-
COCOON_S3_BLOBSTORE_ENABLED: ${COCOON_S3_BLOBSTORE_ENABLED:-false}
-
COCOON_S3_REGION: ${COCOON_S3_REGION:-}
-
COCOON_S3_BUCKET: ${COCOON_S3_BUCKET:-}
-
COCOON_S3_ENDPOINT: ${COCOON_S3_ENDPOINT:-}
-
COCOON_S3_ACCESS_KEY: ${COCOON_S3_ACCESS_KEY:-}
-
COCOON_S3_SECRET_KEY: ${COCOON_S3_SECRET_KEY:-}
-
-
# Optional: Fallback proxy
-
COCOON_FALLBACK_PROXY: ${COCOON_FALLBACK_PROXY:-}
-
restart: unless-stopped
-
healthcheck:
-
test: ["CMD", "curl", "-f", "http://localhost:8080/xrpc/_health"]
-
interval: 30s
-
timeout: 10s
-
retries: 3
-
start_period: 40s
-
-
create-invite:
-
build:
-
context: .
-
dockerfile: Dockerfile
-
image: ghcr.io/haileyok/cocoon:latest
-
container_name: cocoon-create-invite
-
volumes:
-
- ./keys:/keys
-
- ./create-initial-invite.sh:/create-initial-invite.sh:ro
-
environment:
-
COCOON_DID: ${COCOON_DID}
-
COCOON_HOSTNAME: ${COCOON_HOSTNAME}
-
COCOON_ROTATION_KEY_PATH: /keys/rotation.key
-
COCOON_JWK_PATH: /keys/jwk.key
-
COCOON_CONTACT_EMAIL: ${COCOON_CONTACT_EMAIL}
-
COCOON_RELAYS: ${COCOON_RELAYS:-https://bsky.network}
-
COCOON_ADMIN_PASSWORD: ${COCOON_ADMIN_PASSWORD}
-
COCOON_DB_TYPE: postgres
-
COCOON_DATABASE_URL: postgres://cocoon:${POSTGRES_PASSWORD}@postgres:5432/cocoon?sslmode=disable
-
depends_on:
-
cocoon:
-
condition: service_healthy
-
entrypoint: ["/bin/sh", "/create-initial-invite.sh"]
-
restart: "no"
-
-
caddy:
-
image: caddy:2-alpine
-
container_name: cocoon-caddy
-
ports:
-
- "80:80"
-
- "443:443"
-
volumes:
-
- ./Caddyfile.postgres:/etc/caddy/Caddyfile:ro
-
- caddy_data:/data
-
- caddy_config:/config
-
restart: unless-stopped
-
environment:
-
COCOON_HOSTNAME: ${COCOON_HOSTNAME}
-
CADDY_ACME_EMAIL: ${COCOON_CONTACT_EMAIL:-}
-
-
volumes:
-
postgres_data:
-
driver: local
-
caddy_data:
-
driver: local
-
caddy_config:
-
driver: local
+2 -6
docker-compose.yaml
···
# Server configuration
COCOON_ADDR: ":8080"
-
COCOON_DB_TYPE: ${COCOON_DB_TYPE:-sqlite}
-
COCOON_DB_NAME: ${COCOON_DB_NAME:-/data/cocoon/cocoon.db}
-
COCOON_DATABASE_URL: ${COCOON_DATABASE_URL:-}
+
COCOON_DB_NAME: /data/cocoon/cocoon.db
COCOON_BLOCKSTORE_VARIANT: ${COCOON_BLOCKSTORE_VARIANT:-sqlite}
# Optional: SMTP settings for email
···
COCOON_CONTACT_EMAIL: ${COCOON_CONTACT_EMAIL}
COCOON_RELAYS: ${COCOON_RELAYS:-https://bsky.network}
COCOON_ADMIN_PASSWORD: ${COCOON_ADMIN_PASSWORD}
-
COCOON_DB_TYPE: ${COCOON_DB_TYPE:-sqlite}
-
COCOON_DB_NAME: ${COCOON_DB_NAME:-/data/cocoon/cocoon.db}
-
COCOON_DATABASE_URL: ${COCOON_DATABASE_URL:-}
+
COCOON_DB_NAME: /data/cocoon/cocoon.db
depends_on:
- init-keys
entrypoint: ["/bin/sh", "/create-initial-invite.sh"]
+1
server/handle_account.go
···
func (s *Server) handleAccount(e echo.Context) error {
ctx := e.Request().Context()
+
repo, sess, err := s.getSessionRepoOrErr(e)
if err != nil {
return e.Redirect(303, "/account/signin")
+8 -7
server/handle_import_repo.go
···
import (
"bytes"
-
"context"
"io"
"slices"
"strings"
···
)
func (s *Server) handleRepoImportRepo(e echo.Context) error {
+
ctx := e.Request().Context()
+
urepo := e.Get("repo").(*models.RepoActor)
b, err := io.ReadAll(e.Request().Body)
···
slices.Reverse(orderedBlocks)
-
if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
+
if err := bs.PutMany(ctx, orderedBlocks); err != nil {
s.logger.Error("could not insert blocks", "error", err)
return helpers.ServerError(e, nil)
}
-
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
+
r, err := repo.OpenRepo(ctx, bs, cs.Header.Roots[0])
if err != nil {
s.logger.Error("could not open repo", "error", err)
return helpers.ServerError(e, nil)
···
clock := syntax.NewTIDClock(0)
-
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
+
if err := r.ForEach(ctx, "", func(key string, cid cid.Cid) error {
pts := strings.Split(key, "/")
nsid := pts[0]
rkey := pts[1]
cidStr := cid.String()
-
b, err := bs.Get(context.TODO(), cid)
+
b, err := bs.Get(ctx, cid)
if err != nil {
s.logger.Error("record bytes don't exist in blockstore", "error", err)
return helpers.ServerError(e, nil)
···
tx.Commit()
-
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
root, rev, err := r.Commit(ctx, urepo.SignFor)
if err != nil {
s.logger.Error("error committing", "error", err)
return helpers.ServerError(e, nil)
}
-
if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
+
if err := s.UpdateRepo(ctx, urepo.Repo.Did, root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
+3 -1
server/handle_repo_apply_writes.go
···
}
func (s *Server) handleApplyWrites(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoApplyWritesRequest
···
})
}
-
results, err := s.repoman.applyWrites(repo.Repo, ops, req.SwapCommit)
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, ops, req.SwapCommit)
if err != nil {
s.logger.Error("error applying writes", "error", err)
return helpers.ServerError(e, nil)
+3 -1
server/handle_repo_create_record.go
···
}
func (s *Server) handleCreateRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoCreateRecordRequest
···
optype = OpTypeUpdate
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: optype,
Collection: req.Collection,
+3 -1
server/handle_repo_delete_record.go
···
}
func (s *Server) handleDeleteRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoDeleteRecordRequest
···
return helpers.InputError(e, nil)
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: OpTypeDelete,
Collection: req.Collection,
-21
server/handle_repo_list_missing_blobs.go
···
-
package server
-
-
import (
-
"github.com/labstack/echo/v4"
-
)
-
-
type ComAtprotoRepoListMissingBlobsResponse struct {
-
Cursor *string `json:"cursor,omitempty"`
-
Blobs []ComAtprotoRepoListMissingBlobsRecordBlob `json:"blobs"`
-
}
-
-
type ComAtprotoRepoListMissingBlobsRecordBlob struct {
-
Cid string `json:"cid"`
-
RecordUri string `json:"recordUri"`
-
}
-
-
func (s *Server) handleListMissingBlobs(e echo.Context) error {
-
return e.JSON(200, ComAtprotoRepoListMissingBlobsResponse{
-
Blobs: []ComAtprotoRepoListMissingBlobsRecordBlob{},
-
})
-
}
+3 -1
server/handle_repo_put_record.go
···
}
func (s *Server) handlePutRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
repo := e.Get("repo").(*models.RepoActor)
var req ComAtprotoRepoPutRecordRequest
···
optype = OpTypeUpdate
}
-
results, err := s.repoman.applyWrites(repo.Repo, []Op{
+
results, err := s.repoman.applyWrites(ctx, repo.Repo, []Op{
{
Type: optype,
Collection: req.Collection,
+46 -15
server/handle_server_check_account_status.go
···
package server
import (
+
"errors"
+
"sync"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
func (s *Server) handleServerCheckAccountStatus(e echo.Context) error {
urepo := e.Get("repo").(*models.RepoActor)
+
_, didErr := syntax.ParseDID(urepo.Repo.Did)
+
if didErr != nil {
+
s.logger.Error("error validating did", "err", didErr)
+
}
+
resp := ComAtprotoServerCheckAccountStatusResponse{
Activated: true, // TODO: should allow for deactivation etc.
-
ValidDid: true, // TODO: should probably verify?
+
ValidDid: didErr == nil,
RepoRev: urepo.Rev,
ImportedBlobs: 0, // TODO: ???
}
···
s.logger.Error("error casting cid", "error", err)
return helpers.ServerError(e, nil)
}
+
resp.RepoCommit = rootcid.String()
type CountResp struct {
···
}
var blockCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
-
s.logger.Error("error getting block count", "error", err)
-
return helpers.ServerError(e, nil)
-
}
-
resp.RepoBlocks = blockCtResp.Ct
+
var recCtResp CountResp
+
var blobCtResp CountResp
+
+
var wg sync.WaitGroup
+
var procErr error
+
+
wg.Add(1)
+
go func() {
+
defer wg.Done()
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blocks WHERE did = ?", nil, urepo.Repo.Did).Scan(&blockCtResp).Error; err != nil {
+
s.logger.Error("error getting block count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
+
+
wg.Add(1)
+
go func() {
+
defer wg.Done()
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
+
s.logger.Error("error getting record count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
-
var recCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM records WHERE did = ?", nil, urepo.Repo.Did).Scan(&recCtResp).Error; err != nil {
-
s.logger.Error("error getting record count", "error", err)
-
return helpers.ServerError(e, nil)
-
}
-
resp.IndexedRecords = recCtResp.Ct
+
wg.Add(1)
+
go func() {
+
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
+
s.logger.Error("error getting expected blobs count", "error", err)
+
procErr = errors.Join(procErr, err)
+
}
+
}()
-
var blobCtResp CountResp
-
if err := s.db.Raw("SELECT COUNT(*) AS ct FROM blobs WHERE did = ?", nil, urepo.Repo.Did).Scan(&blobCtResp).Error; err != nil {
-
s.logger.Error("error getting record count", "error", err)
+
wg.Wait()
+
if procErr != nil {
return helpers.ServerError(e, nil)
}
+
+
resp.RepoBlocks = blockCtResp.Ct
+
resp.IndexedRecords = recCtResp.Ct
resp.ExpectedBlobs = blobCtResp.Ct
return e.JSON(200, resp)
+3 -1
server/handle_sync_get_record.go
···
)
func (s *Server) handleSyncGetRecord(e echo.Context) error {
+
ctx := e.Request().Context()
+
did := e.QueryParam("did")
collection := e.QueryParam("collection")
rkey := e.QueryParam("rkey")
···
return helpers.ServerError(e, nil)
}
-
root, blocks, err := s.repoman.getRecordProof(urepo, collection, rkey)
+
root, blocks, err := s.repoman.getRecordProof(ctx, urepo, collection, rkey)
if err != nil {
return err
}
+19 -16
server/repo.go
···
}
// TODO make use of swap commit
-
func (rm *RepoMan) applyWrites(urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
+
func (rm *RepoMan) applyWrites(ctx context.Context, urepo models.Repo, writes []Op, swapCommit *string) ([]ApplyWriteResult, error) {
rootcid, err := cid.Cast(urepo.Root)
if err != nil {
return nil, err
···
dbs := rm.s.getBlockstore(urepo.Did)
bs := recording_blockstore.New(dbs)
-
r, err := repo.OpenRepo(context.TODO(), bs, rootcid)
+
r, err := repo.OpenRepo(ctx, bs, rootcid)
-
entries := []models.Record{}
-
var results []ApplyWriteResult
+
entries := make([]models.Record, 0, len(writes))
+
results := make([]ApplyWriteResult, 0, len(writes))
for i, op := range writes {
if op.Type != OpTypeCreate && op.Rkey == nil {
return nil, fmt.Errorf("invalid rkey")
} else if op.Type == OpTypeCreate && op.Rkey != nil {
-
_, _, err := r.GetRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
+
_, _, err := r.GetRecord(ctx, op.Collection+"/"+*op.Rkey)
if err == nil {
op.Type = OpTypeUpdate
}
···
mm["$type"] = op.Collection
}
-
nc, err := r.PutRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
+
nc, err := r.PutRecord(ctx, op.Collection+"/"+*op.Rkey, &mm)
if err != nil {
return nil, err
}
···
Rkey: *op.Rkey,
Value: old.Value,
})
-
err := r.DeleteRecord(context.TODO(), op.Collection+"/"+*op.Rkey)
+
err := r.DeleteRecord(ctx, op.Collection+"/"+*op.Rkey)
if err != nil {
return nil, err
}
···
return nil, err
}
mm := MarshalableMap(out)
-
nc, err := r.UpdateRecord(context.TODO(), op.Collection+"/"+*op.Rkey, &mm)
+
nc, err := r.UpdateRecord(ctx, op.Collection+"/"+*op.Rkey, &mm)
if err != nil {
return nil, err
}
···
}
}
-
newroot, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
newroot, rev, err := r.Commit(ctx, urepo.SignFor)
if err != nil {
return nil, err
}
···
Roots: []cid.Cid{newroot},
Version: 1,
})
+
if err != nil {
+
return nil, err
+
}
if _, err := carstore.LdWrite(buf, hb); err != nil {
return nil, err
}
-
diffops, err := r.DiffSince(context.TODO(), rootcid)
+
diffops, err := r.DiffSince(ctx, rootcid)
if err != nil {
return nil, err
}
···
})
}
-
blk, err := dbs.Get(context.TODO(), c)
+
blk, err := dbs.Get(ctx, c)
if err != nil {
return nil, err
}
···
}
}
-
rm.s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
rm.s.evtman.AddEvent(ctx, &events.XRPCStreamEvent{
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
Repo: urepo.Did,
Blocks: buf.Bytes(),
···
},
})
-
if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil {
+
if err := rm.s.UpdateRepo(ctx, urepo.Did, newroot, rev); err != nil {
return nil, err
}
···
return results, nil
}
-
func (rm *RepoMan) getRecordProof(urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
+
func (rm *RepoMan) getRecordProof(ctx context.Context, urepo models.Repo, collection, rkey string) (cid.Cid, []blocks.Block, error) {
c, err := cid.Cast(urepo.Root)
if err != nil {
return cid.Undef, nil, err
···
dbs := rm.s.getBlockstore(urepo.Did)
bs := recording_blockstore.New(dbs)
-
r, err := repo.OpenRepo(context.TODO(), bs, c)
+
r, err := repo.OpenRepo(ctx, bs, c)
if err != nil {
return cid.Undef, nil, err
}
-
_, _, err = r.GetRecordBytes(context.TODO(), collection+"/"+rkey)
+
_, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
if err != nil {
return cid.Undef, nil, err
}
+3 -34
server/server.go
···
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
slogecho "github.com/samber/slog-echo"
-
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
···
requestCrawlMu sync.Mutex
dbName string
-
dbType string
s3Config *S3Config
}
type Args struct {
Addr string
DbName string
-
DbType string
-
DatabaseURL string
Logger *slog.Logger
Version string
Did string
···
IdleTimeout: 5 * time.Minute,
}
-
dbType := args.DbType
-
if dbType == "" {
-
dbType = "sqlite"
-
}
-
-
var gdb *gorm.DB
-
var err error
-
switch dbType {
-
case "postgres":
-
if args.DatabaseURL == "" {
-
return nil, fmt.Errorf("database-url must be set when using postgres")
-
}
-
gdb, err = gorm.Open(postgres.Open(args.DatabaseURL), &gorm.Config{})
-
if err != nil {
-
return nil, fmt.Errorf("failed to connect to postgres: %w", err)
-
}
-
args.Logger.Info("connected to PostgreSQL database")
-
default:
-
gdb, err = gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
-
if err != nil {
-
return nil, fmt.Errorf("failed to open sqlite database: %w", err)
-
}
-
args.Logger.Info("connected to SQLite database", "path", args.DbName)
+
gdb, err := gorm.Open(sqlite.Open(args.DbName), &gorm.Config{})
+
if err != nil {
+
return nil, err
}
dbw := db.NewDB(gdb)
···
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
dbName: args.DbName,
-
dbType: dbType,
s3Config: args.S3Config,
oauthProvider: provider.NewProvider(provider.Args{
···
s.echo.GET("/xrpc/com.atproto.repo.describeRepo", s.handleDescribeRepo)
s.echo.GET("/xrpc/com.atproto.sync.listRepos", s.handleListRepos)
s.echo.GET("/xrpc/com.atproto.repo.listRecords", s.handleListRecords)
-
s.echo.GET("/xrpc/com.atproto.repo.listMissingBlobs", s.handleListMissingBlobs)
s.echo.GET("/xrpc/com.atproto.repo.getRecord", s.handleRepoGetRecord)
s.echo.GET("/xrpc/com.atproto.sync.getRecord", s.handleSyncGetRecord)
s.echo.GET("/xrpc/com.atproto.sync.getBlocks", s.handleGetBlocks)
···
}
func (s *Server) doBackup() {
-
if s.dbType == "postgres" {
-
s.logger.Info("skipping S3 backup - PostgreSQL backups should be handled externally (pg_dump, managed database backups, etc.)")
-
return
-
}
-
start := time.Now()
s.logger.Info("beginning backup to s3...")