An atproto PDS written in Go

refactor blockstore to be extensible (#21)

* refactor blockstore to be extensible

* add configuration for setting blockstore variant

* use the configuration when getting blockstore

+1 -9
blockstore/blockstore.go sqlite_blockstore/sqlite_blockstore.go
···
-
package blockstore
import (
"context"
···
func (bs *SqliteBlockstore) HashOnRead(enabled bool) {
panic("not implemented")
-
}
-
-
func (bs *SqliteBlockstore) UpdateRepo(ctx context.Context, root cid.Cid, rev string) error {
-
if err := bs.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, bs.did).Error; err != nil {
-
return err
-
}
-
-
return nil
}
func (bs *SqliteBlockstore) Execute(ctx context.Context) error {
···
+
package sqlite_blockstore
import (
"context"
···
func (bs *SqliteBlockstore) HashOnRead(enabled bool) {
panic("not implemented")
}
func (bs *SqliteBlockstore) Execute(ctx context.Context) error {
+7
cmd/cocoon/main.go
···
EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"},
Value: "did:web:api.bsky.app#bsky_appview",
},
},
Commands: []*cli.Command{
runServe,
···
Usage: "Start the cocoon PDS",
Flags: []cli.Flag{},
Action: func(cmd *cli.Context) error {
s, err := server.New(&server.Args{
Addr: cmd.String("addr"),
DbName: cmd.String("db-name"),
···
},
SessionSecret: cmd.String("session-secret"),
DefaultAtprotoProxy: cmd.String("default-atproto-proxy"),
})
if err != nil {
fmt.Printf("error creating cocoon: %v", err)
···
EnvVars: []string{"COCOON_DEFAULT_ATPROTO_PROXY"},
Value: "did:web:api.bsky.app#bsky_appview",
},
+
&cli.StringFlag{
+
Name: "blockstore-variant",
+
EnvVars: []string{"COCOON_BLOCKSTORE_VARIANT"},
+
Value: "sqlite",
+
},
},
Commands: []*cli.Command{
runServe,
···
Usage: "Start the cocoon PDS",
Flags: []cli.Flag{},
Action: func(cmd *cli.Context) error {
+
s, err := server.New(&server.Args{
Addr: cmd.String("addr"),
DbName: cmd.String("db-name"),
···
},
SessionSecret: cmd.String("session-secret"),
DefaultAtprotoProxy: cmd.String("default-atproto-proxy"),
+
BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
})
if err != nil {
fmt.Printf("error creating cocoon: %v", err)
+77
recording_blockstore/recording_blockstore.go
···
···
+
package recording_blockstore
+
+
import (
+
"context"
+
+
blockformat "github.com/ipfs/go-block-format"
+
"github.com/ipfs/go-cid"
+
blockstore "github.com/ipfs/go-ipfs-blockstore"
+
)
+
+
type RecordingBlockstore struct {
+
base blockstore.Blockstore
+
+
inserts map[cid.Cid]blockformat.Block
+
}
+
+
func New(base blockstore.Blockstore) *RecordingBlockstore {
+
return &RecordingBlockstore{
+
base: base,
+
inserts: make(map[cid.Cid]blockformat.Block),
+
}
+
}
+
+
func (bs *RecordingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
+
return bs.base.Has(ctx, c)
+
}
+
+
func (bs *RecordingBlockstore) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
+
return bs.base.Get(ctx, c)
+
}
+
+
func (bs *RecordingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
+
return bs.base.GetSize(ctx, c)
+
}
+
+
func (bs *RecordingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
+
return bs.base.DeleteBlock(ctx, c)
+
}
+
+
func (bs *RecordingBlockstore) Put(ctx context.Context, block blockformat.Block) error {
+
if err := bs.base.Put(ctx, block); err != nil {
+
return err
+
}
+
bs.inserts[block.Cid()] = block
+
return nil
+
}
+
+
func (bs *RecordingBlockstore) PutMany(ctx context.Context, blocks []blockformat.Block) error {
+
if err := bs.base.PutMany(ctx, blocks); err != nil {
+
return err
+
}
+
+
for _, b := range blocks {
+
bs.inserts[b.Cid()] = b
+
}
+
+
return nil
+
}
+
+
func (bs *RecordingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
+
return bs.AllKeysChan(ctx)
+
}
+
+
func (bs *RecordingBlockstore) HashOnRead(enabled bool) {
+
}
+
+
func (bs *RecordingBlockstore) GetLogMap() map[cid.Cid]blockformat.Block {
+
return bs.inserts
+
}
+
+
func (bs *RecordingBlockstore) GetLogArray() []blockformat.Block {
+
var blocks []blockformat.Block
+
for _, b := range bs.inserts {
+
blocks = append(blocks, b)
+
}
+
return blocks
+
}
+30
server/blockstore_variant.go
···
···
+
package server
+
+
import (
+
"github.com/haileyok/cocoon/sqlite_blockstore"
+
blockstore "github.com/ipfs/go-ipfs-blockstore"
+
)
+
+
type BlockstoreVariant int
+
+
const (
+
BlockstoreVariantSqlite = iota
+
)
+
+
func MustReturnBlockstoreVariant(maybeBsv string) BlockstoreVariant {
+
switch maybeBsv {
+
case "sqlite":
+
return BlockstoreVariantSqlite
+
default:
+
panic("invalid blockstore variant provided")
+
}
+
}
+
+
func (s *Server) getBlockstore(did string) blockstore.Blockstore {
+
switch s.config.BlockstoreVariant {
+
case BlockstoreVariantSqlite:
+
return sqlite_blockstore.New(did, s.db)
+
default:
+
return sqlite_blockstore.New(did, s.db)
+
}
+
}
+2 -3
server/handle_import_repo.go
···
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/repo"
-
"github.com/haileyok/cocoon/blockstore"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
blocks "github.com/ipfs/go-block-format"
···
return helpers.ServerError(e, nil)
}
-
bs := blockstore.New(urepo.Repo.Did, s.db)
cs, err := car.NewCarReader(bytes.NewReader(b))
if err != nil {
···
return helpers.ServerError(e, nil)
}
-
if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
···
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/repo"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
blocks "github.com/ipfs/go-block-format"
···
return helpers.ServerError(e, nil)
}
+
bs := s.getBlockstore(urepo.Repo.Did)
cs, err := car.NewCarReader(bytes.NewReader(b))
if err != nil {
···
return helpers.ServerError(e, nil)
}
+
if err := s.UpdateRepo(context.TODO(), urepo.Repo.Did, root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
+2 -3
server/handle_server_create_account.go
···
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/util"
-
"github.com/haileyok/cocoon/blockstore"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/labstack/echo/v4"
···
}
if customDidHeader == "" {
-
bs := blockstore.New(signupDid, s.db)
r := repo.NewRepo(context.TODO(), signupDid, bs)
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
···
return helpers.ServerError(e, nil)
}
-
if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
···
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/util"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/labstack/echo/v4"
···
}
if customDidHeader == "" {
+
bs := s.getBlockstore(signupDid)
r := repo.NewRepo(context.TODO(), signupDid, bs)
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
···
return helpers.ServerError(e, nil)
}
+
if err := s.UpdateRepo(context.TODO(), urepo.Did, root, rev); err != nil {
s.logger.Error("error updating repo after commit", "error", err)
return helpers.ServerError(e, nil)
}
+1 -2
server/handle_sync_get_blocks.go
···
"strings"
"github.com/bluesky-social/indigo/carstore"
-
"github.com/haileyok/cocoon/blockstore"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
···
return helpers.ServerError(e, nil)
}
-
bs := blockstore.New(urepo.Repo.Did, s.db)
for _, c := range cids {
b, err := bs.Get(context.TODO(), c)
···
"strings"
"github.com/bluesky-social/indigo/carstore"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
···
return helpers.ServerError(e, nil)
}
+
bs := s.getBlockstore(urepo.Repo.Did)
for _, c := range cids {
b, err := bs.Get(context.TODO(), c)
+12 -12
server/repo.go
···
"github.com/bluesky-social/indigo/events"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
-
"github.com/bluesky-social/indigo/util"
-
"github.com/haileyok/cocoon/blockstore"
"github.com/haileyok/cocoon/internal/db"
"github.com/haileyok/cocoon/models"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
···
return nil, err
}
-
dbs := blockstore.New(urepo.Did, rm.db)
r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
entries := []models.Record{}
···
}
}
-
for _, op := range dbs.GetLog() {
if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
return nil, err
}
···
},
})
-
if err := dbs.UpdateRepo(context.TODO(), newroot, rev); err != nil {
return nil, err
}
···
return cid.Undef, nil, err
}
-
dbs := blockstore.New(urepo.Did, rm.db)
-
bs := util.NewLoggingBstore(dbs)
r, err := repo.OpenRepo(context.TODO(), bs, c)
if err != nil {
···
return cid.Undef, nil, err
}
-
return c, bs.GetLoggedBlocks(), nil
}
func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
···
return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
}
-
var deepiter func(interface{}) error
-
deepiter = func(item interface{}) error {
switch val := item.(type) {
-
case map[string]interface{}:
if val["$type"] == "blob" {
if ref, ok := val["ref"].(string); ok {
c, err := cid.Parse(ref)
···
return deepiter(v)
}
}
-
case []interface{}:
for _, v := range val {
deepiter(v)
}
···
"github.com/bluesky-social/indigo/events"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/repo"
"github.com/haileyok/cocoon/internal/db"
"github.com/haileyok/cocoon/models"
+
"github.com/haileyok/cocoon/recording_blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
···
return nil, err
}
+
dbs := rm.s.getBlockstore(urepo.Did)
+
bs := recording_blockstore.New(dbs)
r, err := repo.OpenRepo(context.TODO(), dbs, rootcid)
entries := []models.Record{}
···
}
}
+
for _, op := range bs.GetLogMap() {
if _, err := carstore.LdWrite(buf, op.Cid().Bytes(), op.RawData()); err != nil {
return nil, err
}
···
},
})
+
if err := rm.s.UpdateRepo(context.TODO(), urepo.Did, newroot, rev); err != nil {
return nil, err
}
···
return cid.Undef, nil, err
}
+
dbs := rm.s.getBlockstore(urepo.Did)
+
bs := recording_blockstore.New(dbs)
r, err := repo.OpenRepo(context.TODO(), bs, c)
if err != nil {
···
return cid.Undef, nil, err
}
+
return c, bs.GetLogArray(), nil
}
func (rm *RepoMan) incrementBlobRefs(urepo models.Repo, cbor []byte) ([]cid.Cid, error) {
···
return nil, fmt.Errorf("error unmarshaling cbor: %w", err)
}
+
var deepiter func(any) error
+
deepiter = func(item any) error {
switch val := item.(type) {
+
case map[string]any:
if val["$type"] == "blob" {
if ref, ok := val["ref"].(string); ok {
c, err := cid.Parse(ref)
···
return deepiter(v)
}
}
+
case []any:
for _, v := range val {
deepiter(v)
}
+13
server/server.go
···
"github.com/haileyok/cocoon/oauth/dpop"
"github.com/haileyok/cocoon/oauth/provider"
"github.com/haileyok/cocoon/plc"
echo_session "github.com/labstack/echo-contrib/session"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
···
SessionSecret string
DefaultAtprotoProxy string
}
type config struct {
···
SmtpEmail string
SmtpName string
DefaultAtprotoProxy string
}
type CustomValidator struct {
···
SmtpName: args.SmtpName,
SmtpEmail: args.SmtpEmail,
DefaultAtprotoProxy: args.DefaultAtprotoProxy,
},
evtman: events.NewEventManager(events.NewMemPersister()),
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
···
go s.doBackup()
}
}
···
"github.com/haileyok/cocoon/oauth/dpop"
"github.com/haileyok/cocoon/oauth/provider"
"github.com/haileyok/cocoon/plc"
+
"github.com/ipfs/go-cid"
echo_session "github.com/labstack/echo-contrib/session"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
···
SessionSecret string
DefaultAtprotoProxy string
+
+
BlockstoreVariant BlockstoreVariant
}
type config struct {
···
SmtpEmail string
SmtpName string
DefaultAtprotoProxy string
+
BlockstoreVariant BlockstoreVariant
}
type CustomValidator struct {
···
SmtpName: args.SmtpName,
SmtpEmail: args.SmtpEmail,
DefaultAtprotoProxy: args.DefaultAtprotoProxy,
+
BlockstoreVariant: args.BlockstoreVariant,
},
evtman: events.NewEventManager(events.NewMemPersister()),
passport: identity.NewPassport(h, identity.NewMemCache(10_000)),
···
go s.doBackup()
}
}
+
+
func (s *Server) UpdateRepo(ctx context.Context, did string, root cid.Cid, rev string) error {
+
if err := s.db.Exec("UPDATE repos SET root = ?, rev = ? WHERE did = ?", nil, root.Bytes(), rev, did).Error; err != nil {
+
return err
+
}
+
+
return nil
+
}