An atproto PDS written in Go

half finished import repo (#14)

+44 -7
blockstore/blockstore.go
···
db *gorm.DB
did string
readonly bool
-
inserts []blocks.Block
+
inserts map[cid.Cid]blocks.Block
}
func New(did string, db *gorm.DB) *SqliteBlockstore {
···
did: did,
db: db,
readonly: false,
-
inserts: []blocks.Block{},
+
inserts: map[cid.Cid]blocks.Block{},
}
}
···
did: did,
db: db,
readonly: true,
-
inserts: []blocks.Block{},
+
inserts: map[cid.Cid]blocks.Block{},
}
}
func (bs *SqliteBlockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
var block models.Block
+
+
maybeBlock, ok := bs.inserts[cid]
+
if ok {
+
return maybeBlock, nil
+
}
+
if err := bs.db.Raw("SELECT * FROM blocks WHERE did = ? AND cid = ?", bs.did, cid.Bytes()).Scan(&block).Error; err != nil {
return nil, err
}
···
}
func (bs *SqliteBlockstore) Put(ctx context.Context, block blocks.Block) error {
-
bs.inserts = append(bs.inserts, block)
+
bs.inserts[block.Cid()] = block
if bs.readonly {
return nil
···
panic("not implemented")
}
-
func (bs *SqliteBlockstore) PutMany(context.Context, []blocks.Block) error {
-
panic("not implemented")
+
func (bs *SqliteBlockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
+
tx := bs.db.Begin()
+
+
for _, block := range blocks {
+
bs.inserts[block.Cid()] = block
+
+
if bs.readonly {
+
continue
+
}
+
+
b := models.Block{
+
Did: bs.did,
+
Cid: block.Cid().Bytes(),
+
Rev: syntax.NewTIDNow(0).String(), // TODO: WARN, this is bad. don't do this
+
Value: block.RawData(),
+
}
+
+
if err := tx.Clauses(clause.OnConflict{
+
Columns: []clause.Column{{Name: "did"}, {Name: "cid"}},
+
UpdateAll: true,
+
}).Create(&b).Error; err != nil {
+
tx.Rollback()
+
return err
+
}
+
}
+
+
if bs.readonly {
+
return nil
+
}
+
+
tx.Commit()
+
+
return nil
}
func (bs *SqliteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
···
return nil
}
-
func (bs *SqliteBlockstore) GetLog() []blocks.Block {
+
func (bs *SqliteBlockstore) GetLog() map[cid.Cid]blocks.Block {
return bs.inserts
}
+116
server/handle_import_repo.go
···
+
package server
+
+
import (
+
"bytes"
+
"context"
+
"io"
+
"slices"
+
"strings"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/bluesky-social/indigo/repo"
+
"github.com/haileyok/cocoon/blockstore"
+
"github.com/haileyok/cocoon/internal/helpers"
+
"github.com/haileyok/cocoon/models"
+
blocks "github.com/ipfs/go-block-format"
+
"github.com/ipfs/go-cid"
+
"github.com/ipld/go-car"
+
"github.com/labstack/echo/v4"
+
)
+
+
func (s *Server) handleRepoImportRepo(e echo.Context) error {
+
urepo := e.Get("repo").(*models.RepoActor)
+
+
b, err := io.ReadAll(e.Request().Body)
+
if err != nil {
+
s.logger.Error("could not read bytes in import request", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
bs := blockstore.New(urepo.Repo.Did, s.db)
+
+
cs, err := car.NewCarReader(bytes.NewReader(b))
+
if err != nil {
+
s.logger.Error("could not read car in import request", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
orderedBlocks := []blocks.Block{}
+
currBlock, err := cs.Next()
+
if err != nil {
+
s.logger.Error("could not get first block from car", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
currBlockCt := 1
+
+
for currBlock != nil {
+
s.logger.Info("someone is importing their repo", "block", currBlockCt)
+
orderedBlocks = append(orderedBlocks, currBlock)
+
next, _ := cs.Next()
+
currBlock = next
+
currBlockCt++
+
}
+
+
slices.Reverse(orderedBlocks)
+
+
if err := bs.PutMany(context.TODO(), orderedBlocks); err != nil {
+
s.logger.Error("could not insert blocks", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
+
if err != nil {
+
s.logger.Error("could not open repo", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
tx := s.db.Begin()
+
+
clock := syntax.NewTIDClock(0)
+
+
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
+
pts := strings.Split(key, "/")
+
nsid := pts[0]
+
rkey := pts[1]
+
cidStr := cid.String()
+
b, err := bs.Get(context.TODO(), cid)
+
if err != nil {
+
s.logger.Error("record bytes don't exist in blockstore", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
rec := models.Record{
+
Did: urepo.Repo.Did,
+
CreatedAt: clock.Next().String(),
+
Nsid: nsid,
+
Rkey: rkey,
+
Cid: cidStr,
+
Value: b.RawData(),
+
}
+
+
if err := tx.Create(rec).Error; err != nil {
+
return err
+
}
+
+
return nil
+
}); err != nil {
+
tx.Rollback()
+
s.logger.Error("record bytes don't exist in blockstore", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
tx.Commit()
+
+
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
if err != nil {
+
s.logger.Error("error committing", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil {
+
s.logger.Error("error updating repo after commit", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
return nil
+
}
+62 -42
server/handle_server_create_account.go
···
"github.com/Azure/go-autorest/autorest/to"
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/atproto/crypto"
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/repo"
"github.com/bluesky-social/indigo/util"
···
func (s *Server) handleCreateAccount(e echo.Context) error {
var request ComAtprotoServerCreateAccountRequest
+
+
var signupDid string
+
customDidHeader := e.Request().Header.Get("authorization")
+
if customDidHeader != "" {
+
pts := strings.Split(customDidHeader, " ")
+
if len(pts) != 2 {
+
return helpers.InputError(e, to.StringPtr("InvalidDid"))
+
}
+
+
_, err := syntax.ParseDID(pts[1])
+
if err != nil {
+
return helpers.InputError(e, to.StringPtr("InvalidDid"))
+
}
+
+
signupDid = pts[1]
+
}
if err := e.Bind(&request); err != nil {
s.logger.Error("error receiving request", "endpoint", "com.atproto.server.createAccount", "error", err)
···
// TODO: unsupported domains
-
// TODO: did stuff
-
k, err := crypto.GeneratePrivateKeyK256()
if err != nil {
s.logger.Error("error creating signing key", "endpoint", "com.atproto.server.createAccount", "error", err)
return helpers.ServerError(e, nil)
}
-
did, op, err := s.plcClient.CreateDID(k, "", request.Handle)
-
if err != nil {
-
s.logger.Error("error creating operation", "endpoint", "com.atproto.server.createAccount", "error", err)
-
return helpers.ServerError(e, nil)
-
}
+
if signupDid == "" {
+
did, op, err := s.plcClient.CreateDID(k, "", request.Handle)
+
if err != nil {
+
s.logger.Error("error creating operation", "endpoint", "com.atproto.server.createAccount", "error", err)
+
return helpers.ServerError(e, nil)
+
}
-
if err := s.plcClient.SendOperation(e.Request().Context(), did, op); err != nil {
-
s.logger.Error("error sending plc op", "endpoint", "com.atproto.server.createAccount", "error", err)
-
return helpers.ServerError(e, nil)
+
if err := s.plcClient.SendOperation(e.Request().Context(), did, op); err != nil {
+
s.logger.Error("error sending plc op", "endpoint", "com.atproto.server.createAccount", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
signupDid = did
}
hashed, err := bcrypt.GenerateFromPassword([]byte(request.Password), 10)
···
}
urepo := models.Repo{
-
Did: did,
+
Did: signupDid,
CreatedAt: time.Now(),
Email: request.Email,
EmailVerificationCode: to.StringPtr(fmt.Sprintf("%s-%s", helpers.RandomVarchar(6), helpers.RandomVarchar(6))),
···
}
actor := models.Actor{
-
Did: did,
+
Did: signupDid,
Handle: request.Handle,
}
···
return helpers.ServerError(e, nil)
}
-
bs := blockstore.New(did, s.db)
-
r := repo.NewRepo(context.TODO(), did, bs)
-
-
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
-
if err != nil {
-
s.logger.Error("error committing", "error", err)
+
if err := s.db.Create(&actor).Error; err != nil {
+
s.logger.Error("error inserting new actor", "error", err)
return helpers.ServerError(e, nil)
}
-
if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil {
-
s.logger.Error("error updating repo after commit", "error", err)
-
return helpers.ServerError(e, nil)
-
}
+
if customDidHeader == "" {
+
bs := blockstore.New(signupDid, s.db)
+
r := repo.NewRepo(context.TODO(), signupDid, bs)
-
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
-
RepoHandle: &atproto.SyncSubscribeRepos_Handle{
-
Did: urepo.Did,
-
Handle: request.Handle,
-
Seq: time.Now().UnixMicro(), // TODO: no
-
Time: time.Now().Format(util.ISO8601),
-
},
-
})
+
root, rev, err := r.Commit(context.TODO(), urepo.SignFor)
+
if err != nil {
+
s.logger.Error("error committing", "error", err)
+
return helpers.ServerError(e, nil)
+
}
-
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
-
RepoIdentity: &atproto.SyncSubscribeRepos_Identity{
-
Did: urepo.Did,
-
Handle: to.StringPtr(request.Handle),
-
Seq: time.Now().UnixMicro(), // TODO: no
-
Time: time.Now().Format(util.ISO8601),
-
},
-
})
+
if err := bs.UpdateRepo(context.TODO(), root, rev); err != nil {
+
s.logger.Error("error updating repo after commit", "error", err)
+
return helpers.ServerError(e, nil)
+
}
-
if err := s.db.Create(&actor).Error; err != nil {
-
s.logger.Error("error inserting new actor", "error", err)
-
return helpers.ServerError(e, nil)
+
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
RepoHandle: &atproto.SyncSubscribeRepos_Handle{
+
Did: urepo.Did,
+
Handle: request.Handle,
+
Seq: time.Now().UnixMicro(), // TODO: no
+
Time: time.Now().Format(util.ISO8601),
+
},
+
})
+
+
s.evtman.AddEvent(context.TODO(), &events.XRPCStreamEvent{
+
RepoIdentity: &atproto.SyncSubscribeRepos_Identity{
+
Did: urepo.Did,
+
Handle: to.StringPtr(request.Handle),
+
Seq: time.Now().UnixMicro(), // TODO: no
+
Time: time.Now().Format(util.ISO8601),
+
},
+
})
}
if err := s.db.Raw("UPDATE invite_codes SET remaining_use_count = remaining_use_count - 1 WHERE code = ?", request.InviteCode).Scan(&ic).Error; err != nil {
···
AccessJwt: sess.AccessToken,
RefreshJwt: sess.RefreshToken,
Handle: request.Handle,
-
Did: did,
+
Did: signupDid,
})
}
+7 -2
server/server.go
···
if err := next(e); err != nil {
e.Error(err)
}
-
+
return nil
}
}
···
httpd := &http.Server{
Addr: args.Addr,
Handler: e,
+
// shitty defaults but okay for now, needed for import repo
+
ReadTimeout: 5 * time.Minute,
+
WriteTimeout: 5 * time.Minute,
+
IdleTimeout: 5 * time.Minute,
}
db, err := gorm.Open(sqlite.Open("cocoon.db"), &gorm.Config{})
···
s.echo.POST("/xrpc/com.atproto.repo.deleteRecord", s.handleDeleteRecord, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.applyWrites", s.handleApplyWrites, s.handleSessionMiddleware)
s.echo.POST("/xrpc/com.atproto.repo.uploadBlob", s.handleRepoUploadBlob, s.handleSessionMiddleware)
+
s.echo.POST("/xrpc/com.atproto.repo.importRepo", s.handleRepoImportRepo, s.handleSessionMiddleware)
// stupid silly endpoints
s.echo.GET("/xrpc/app.bsky.actor.getPreferences", s.handleActorGetPreferences, s.handleSessionMiddleware)
···
// are there any routes that we should be allowing without auth? i dont think so but idk
s.echo.GET("/xrpc/*", s.handleProxy, s.handleSessionMiddleware)
s.echo.POST("/xrpc/*", s.handleProxy, s.handleSessionMiddleware)
-
+
// admin routes
s.echo.POST("/xrpc/com.atproto.server.createInviteCode", s.handleCreateInviteCode, s.handleAdminMiddleware)
s.echo.POST("/xrpc/com.atproto.server.createInviteCodes", s.handleCreateInviteCodes, s.handleAdminMiddleware)