An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+11 -6
cmd/cocoon/main.go
···
Name: "s3-backups-enabled",
EnvVars: []string{"COCOON_S3_BACKUPS_ENABLED"},
},
&cli.StringFlag{
Name: "s3-region",
EnvVars: []string{"COCOON_S3_REGION"},
···
SmtpEmail: cmd.String("smtp-email"),
SmtpName: cmd.String("smtp-name"),
S3Config: &server.S3Config{
-
BackupsEnabled: cmd.Bool("s3-backups-enabled"),
-
Region: cmd.String("s3-region"),
-
Bucket: cmd.String("s3-bucket"),
-
Endpoint: cmd.String("s3-endpoint"),
-
AccessKey: cmd.String("s3-access-key"),
-
SecretKey: cmd.String("s3-secret-key"),
},
SessionSecret: cmd.String("session-secret"),
BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
···
Name: "s3-backups-enabled",
EnvVars: []string{"COCOON_S3_BACKUPS_ENABLED"},
},
+
&cli.BoolFlag{
+
Name: "s3-blobstore-enabled",
+
EnvVars: []string{"COCOON_S3_BLOBSTORE_ENABLED"},
+
},
&cli.StringFlag{
Name: "s3-region",
EnvVars: []string{"COCOON_S3_REGION"},
···
SmtpEmail: cmd.String("smtp-email"),
SmtpName: cmd.String("smtp-name"),
S3Config: &server.S3Config{
+
BackupsEnabled: cmd.Bool("s3-backups-enabled"),
+
BlobstoreEnabled: cmd.Bool("s3-blobstore-enabled"),
+
Region: cmd.String("s3-region"),
+
Bucket: cmd.String("s3-bucket"),
+
Endpoint: cmd.String("s3-endpoint"),
+
AccessKey: cmd.String("s3-access-key"),
+
SecretKey: cmd.String("s3-secret-key"),
},
SessionSecret: cmd.String("session-secret"),
BlockstoreVariant: server.MustReturnBlockstoreVariant(cmd.String("blockstore-variant")),
+1
models/models.go
···
Did string `gorm:"index;index:idx_blob_did_cid"`
Cid []byte `gorm:"index;index:idx_blob_did_cid"`
RefCount int
}
type BlobPart struct {
···
Did string `gorm:"index;index:idx_blob_did_cid"`
Cid []byte `gorm:"index;index:idx_blob_did_cid"`
RefCount int
+
Storage string `gorm:"default:sqlite;check:storage in ('sqlite', 's3')"`
}
type BlobPart struct {
+50 -8
server/handle_repo_upload_blob.go
···
import (
"bytes"
"io"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
mime = "application/octet-stream"
}
blob := models.Blob{
Did: urepo.Repo.Did,
RefCount: 0,
CreatedAt: s.repoman.clock.Next().String(),
}
if err := s.db.Create(&blob, nil).Error; err != nil {
···
read += n
fulldata.Write(data)
-
blobPart := models.BlobPart{
-
BlobID: blob.ID,
-
Idx: part,
-
Data: data,
-
}
-
if err := s.db.Create(&blobPart, nil).Error; err != nil {
-
s.logger.Error("error adding blob part to db", "error", err)
-
return helpers.ServerError(e, nil)
}
part++
···
if err != nil {
s.logger.Error("error creating cid prefix", "error", err)
return helpers.ServerError(e, nil)
}
if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
···
import (
"bytes"
+
"fmt"
"io"
+
"github.com/aws/aws-sdk-go/aws"
+
"github.com/aws/aws-sdk-go/aws/credentials"
+
"github.com/aws/aws-sdk-go/aws/session"
+
"github.com/aws/aws-sdk-go/service/s3"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
mime = "application/octet-stream"
}
+
storage := "sqlite"
+
s3Upload := s.s3Config != nil && s.s3Config.BlobstoreEnabled
+
if s3Upload {
+
storage = "s3"
+
}
blob := models.Blob{
Did: urepo.Repo.Did,
RefCount: 0,
CreatedAt: s.repoman.clock.Next().String(),
+
Storage: storage,
}
if err := s.db.Create(&blob, nil).Error; err != nil {
···
read += n
fulldata.Write(data)
+
if !s3Upload {
+
blobPart := models.BlobPart{
+
BlobID: blob.ID,
+
Idx: part,
+
Data: data,
+
}
+
if err := s.db.Create(&blobPart, nil).Error; err != nil {
+
s.logger.Error("error adding blob part to db", "error", err)
+
return helpers.ServerError(e, nil)
+
}
}
part++
···
if err != nil {
s.logger.Error("error creating cid prefix", "error", err)
return helpers.ServerError(e, nil)
+
}
+
+
if s3Upload {
+
config := &aws.Config{
+
Region: aws.String(s.s3Config.Region),
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
+
}
+
+
if s.s3Config.Endpoint != "" {
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
+
config.S3ForcePathStyle = aws.Bool(true)
+
}
+
+
sess, err := session.NewSession(config)
+
if err != nil {
+
s.logger.Error("error creating aws session", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
svc := s3.New(sess)
+
+
if _, err := svc.PutObject(&s3.PutObjectInput{
+
Bucket: aws.String(s.s3Config.Bucket),
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
+
Body: bytes.NewReader(fulldata.Bytes()),
+
}); err != nil {
+
s.logger.Error("error uploading blob to s3", "error", err)
+
return helpers.ServerError(e, nil)
+
}
}
if err := s.db.Exec("UPDATE blobs SET cid = ? WHERE id = ?", nil, c.Bytes(), blob.ID).Error; err != nil {
+65 -8
server/handle_sync_get_blob.go
···
import (
"bytes"
"github.com/Azure/go-autorest/autorest/to"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
buf := new(bytes.Buffer)
-
var parts []models.BlobPart
-
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
-
s.logger.Error("error getting blob parts", "error", err)
-
return helpers.ServerError(e, nil)
-
}
-
// TODO: we can just stream this, don't need to make a buffer
-
for _, p := range parts {
-
buf.Write(p.Data)
}
e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
···
import (
"bytes"
+
"fmt"
+
"io"
"github.com/Azure/go-autorest/autorest/to"
+
"github.com/aws/aws-sdk-go/aws"
+
"github.com/aws/aws-sdk-go/aws/credentials"
+
"github.com/aws/aws-sdk-go/aws/session"
+
"github.com/aws/aws-sdk-go/service/s3"
"github.com/haileyok/cocoon/internal/helpers"
"github.com/haileyok/cocoon/models"
"github.com/ipfs/go-cid"
···
buf := new(bytes.Buffer)
+
if blob.Storage == "sqlite" {
+
var parts []models.BlobPart
+
if err := s.db.Raw("SELECT * FROM blob_parts WHERE blob_id = ? ORDER BY idx", nil, blob.ID).Scan(&parts).Error; err != nil {
+
s.logger.Error("error getting blob parts", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
// TODO: we can just stream this, don't need to make a buffer
+
for _, p := range parts {
+
buf.Write(p.Data)
+
}
+
} else if blob.Storage == "s3" && s.s3Config != nil && s.s3Config.BlobstoreEnabled {
+
config := &aws.Config{
+
Region: aws.String(s.s3Config.Region),
+
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
+
}
+
+
if s.s3Config.Endpoint != "" {
+
config.Endpoint = aws.String(s.s3Config.Endpoint)
+
config.S3ForcePathStyle = aws.Bool(true)
+
}
+
+
sess, err := session.NewSession(config)
+
if err != nil {
+
s.logger.Error("error creating aws session", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
svc := s3.New(sess)
+
if result, err := svc.GetObject(&s3.GetObjectInput{
+
Bucket: aws.String(s.s3Config.Bucket),
+
Key: aws.String(fmt.Sprintf("blobs/%s/%s", urepo.Repo.Did, c.String())),
+
}); err != nil {
+
s.logger.Error("error getting blob from s3", "error", err)
+
return helpers.ServerError(e, nil)
+
} else {
+
read := 0
+
part := 0
+
partBuf := make([]byte, 0x10000)
+
for {
+
n, err := io.ReadFull(result.Body, partBuf)
+
if err == io.ErrUnexpectedEOF || err == io.EOF {
+
if n == 0 {
+
break
+
}
+
} else if err != nil && err != io.ErrUnexpectedEOF {
+
s.logger.Error("error reading blob", "error", err)
+
return helpers.ServerError(e, nil)
+
}
+
+
data := partBuf[:n]
+
read += n
+
buf.Write(data)
+
part++
+
}
+
}
+
} else {
+
s.logger.Error("unknown storage", "storage", blob.Storage)
+
return helpers.ServerError(e, nil)
}
e.Response().Header().Set(echo.HeaderContentDisposition, "attachment; filename="+c.String())
+11 -31
server/handle_sync_subscribe_repos.go
···
package server
import (
-
"context"
-
"time"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
-
ctx := e.Request().Context()
-
logger := s.logger.With("component", "subscribe-repos-websocket")
-
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
-
logger.Error("unable to establish websocket with relay", "err", err)
return err
}
ident := e.RealIP() + "-" + e.Request().UserAgent()
-
logger = logger.With("ident", ident)
-
logger.Info("new connection established")
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
-
logger.Error("error writing message to relay", "err", err)
-
break
-
}
-
-
if ctx.Err() != nil {
-
logger.Error("context error", "err", err)
-
break
}
var obj util.CBOR
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
-
logger.Warn("unrecognized event kind")
-
return nil
}
if err := header.MarshalCBOR(wc); err != nil {
-
logger.Error("failed to write header to relay", "err", err)
-
break
}
if err := obj.MarshalCBOR(wc); err != nil {
-
logger.Error("failed to write event to relay", "err", err)
-
break
}
if err := wc.Close(); err != nil {
-
logger.Error("failed to flush-close our event write", "err", err)
-
break
}
-
}
-
-
// we should tell the relay to request a new crawl at this point if we got disconnected
-
// use a new context since the old one might be cancelled at this point
-
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
-
defer cancel()
-
if err := s.requestCrawl(ctx); err != nil {
-
logger.Error("error requesting crawls", "err", err)
}
return nil
···
package server
import (
+
"fmt"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
return err
}
+
+
s.logger.Info("new connection", "ua", e.Request().UserAgent())
+
+
ctx := e.Request().Context()
ident := e.RealIP() + "-" + e.Request().UserAgent()
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
+
return err
}
var obj util.CBOR
+
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
+
return fmt.Errorf("unrecognized event kind")
}
if err := header.MarshalCBOR(wc); err != nil {
+
return fmt.Errorf("failed to write header: %w", err)
}
if err := obj.MarshalCBOR(wc); err != nil {
+
return fmt.Errorf("failed to write event: %w", err)
}
if err := wc.Close(); err != nil {
+
return fmt.Errorf("failed to flush-close our event write: %w", err)
}
}
return nil
+12 -42
server/server.go
···
)
type S3Config struct {
-
BackupsEnabled bool
-
Endpoint string
-
Region string
-
Bucket string
-
AccessKey string
-
SecretKey string
}
type Server struct {
···
evtman *events.EventManager
passport *identity.Passport
fallbackProxy string
-
-
lastRequestCrawl time.Time
-
requestCrawlMu sync.Mutex
dbName string
s3Config *S3Config
···
go s.backupRoutine()
-
go func() {
-
if err := s.requestCrawl(ctx); err != nil {
-
s.logger.Error("error requesting crawls", "err", err)
-
}
-
}()
-
-
<-ctx.Done()
-
-
fmt.Println("shut down")
-
-
return nil
-
}
-
-
func (s *Server) requestCrawl(ctx context.Context) error {
-
logger := s.logger.With("component", "request-crawl")
-
s.requestCrawlMu.Lock()
-
defer s.requestCrawlMu.Unlock()
-
-
logger.Info("requesting crawl with configured relays")
-
-
if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
-
return fmt.Errorf("a crawl request has already been made within the last minute")
-
}
-
for _, relay := range s.config.Relays {
-
logger := logger.With("relay", relay)
-
logger.Info("requesting crawl from relay")
cli := xrpc.Client{Host: relay}
-
if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
-
}); err != nil {
-
logger.Error("error requesting crawl", "err", err)
-
} else {
-
logger.Info("crawl requested successfully")
-
}
}
-
s.lastRequestCrawl = time.Now()
return nil
}
···
)
type S3Config struct {
+
BackupsEnabled bool
+
BlobstoreEnabled bool
+
Endpoint string
+
Region string
+
Bucket string
+
AccessKey string
+
SecretKey string
}
type Server struct {
···
evtman *events.EventManager
passport *identity.Passport
fallbackProxy string
dbName string
s3Config *S3Config
···
go s.backupRoutine()
for _, relay := range s.config.Relays {
cli := xrpc.Client{Host: relay}
+
atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
+
})
}
+
<-ctx.Done()
+
+
fmt.Println("shut down")
return nil
}