An atproto PDS written in Go

Compare changes

Choose any two refs to compare.

+1 -1
models/models.go
···
Did string `gorm:"index;index:idx_blob_did_cid"`
Cid []byte `gorm:"index;index:idx_blob_did_cid"`
RefCount int
-
Storage string `gorm:"default:sqlite"`
}
type BlobPart struct {
···
Did string `gorm:"index;index:idx_blob_did_cid"`
Cid []byte `gorm:"index;index:idx_blob_did_cid"`
RefCount int
+
Storage string `gorm:"default:sqlite;check:storage in ('sqlite', 's3')"`
}
type BlobPart struct {
+1 -6
server/handle_sync_get_blob.go
···
for _, p := range parts {
buf.Write(p.Data)
}
-
} else if blob.Storage == "s3" {
-
if !(s.s3Config != nil && s.s3Config.BlobstoreEnabled) {
-
s.logger.Error("s3 storage disabled")
-
return helpers.ServerError(e, nil)
-
}
-
config := &aws.Config{
Region: aws.String(s.s3Config.Region),
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
···
for _, p := range parts {
buf.Write(p.Data)
}
+
} else if blob.Storage == "s3" && s.s3Config != nil && s.s3Config.BlobstoreEnabled {
config := &aws.Config{
Region: aws.String(s.s3Config.Region),
Credentials: credentials.NewStaticCredentials(s.s3Config.AccessKey, s.s3Config.SecretKey, ""),
+11 -31
server/handle_sync_subscribe_repos.go
···
package server
import (
-
"context"
-
"time"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
-
ctx := e.Request().Context()
-
logger := s.logger.With("component", "subscribe-repos-websocket")
-
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
-
logger.Error("unable to establish websocket with relay", "err", err)
return err
}
ident := e.RealIP() + "-" + e.Request().UserAgent()
-
logger = logger.With("ident", ident)
-
logger.Info("new connection established")
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
-
logger.Error("error writing message to relay", "err", err)
-
break
-
}
-
-
if ctx.Err() != nil {
-
logger.Error("context error", "err", err)
-
break
}
var obj util.CBOR
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
-
logger.Warn("unrecognized event kind")
-
return nil
}
if err := header.MarshalCBOR(wc); err != nil {
-
logger.Error("failed to write header to relay", "err", err)
-
break
}
if err := obj.MarshalCBOR(wc); err != nil {
-
logger.Error("failed to write event to relay", "err", err)
-
break
}
if err := wc.Close(); err != nil {
-
logger.Error("failed to flush-close our event write", "err", err)
-
break
}
-
}
-
-
// we should tell the relay to request a new crawl at this point if we got disconnected
-
// use a new context since the old one might be cancelled at this point
-
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
-
defer cancel()
-
if err := s.requestCrawl(ctx); err != nil {
-
logger.Error("error requesting crawls", "err", err)
}
return nil
···
package server
import (
+
"fmt"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
return err
}
+
+
s.logger.Info("new connection", "ua", e.Request().UserAgent())
+
+
ctx := e.Request().Context()
ident := e.RealIP() + "-" + e.Request().UserAgent()
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
+
return err
}
var obj util.CBOR
+
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
+
return fmt.Errorf("unrecognized event kind")
}
if err := header.MarshalCBOR(wc); err != nil {
+
return fmt.Errorf("failed to write header: %w", err)
}
if err := obj.MarshalCBOR(wc); err != nil {
+
return fmt.Errorf("failed to write event: %w", err)
}
if err := wc.Close(); err != nil {
+
return fmt.Errorf("failed to flush-close our event write: %w", err)
}
}
return nil
+5 -36
server/server.go
···
passport *identity.Passport
fallbackProxy string
-
lastRequestCrawl time.Time
-
requestCrawlMu sync.Mutex
-
dbName string
s3Config *S3Config
}
···
go s.backupRoutine()
-
go func() {
-
if err := s.requestCrawl(ctx); err != nil {
-
s.logger.Error("error requesting crawls", "err", err)
-
}
-
}()
-
-
<-ctx.Done()
-
-
fmt.Println("shut down")
-
-
return nil
-
}
-
-
func (s *Server) requestCrawl(ctx context.Context) error {
-
logger := s.logger.With("component", "request-crawl")
-
s.requestCrawlMu.Lock()
-
defer s.requestCrawlMu.Unlock()
-
-
logger.Info("requesting crawl with configured relays")
-
-
if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
-
return fmt.Errorf("a crawl request has already been made within the last minute")
-
}
-
for _, relay := range s.config.Relays {
-
logger := logger.With("relay", relay)
-
logger.Info("requesting crawl from relay")
cli := xrpc.Client{Host: relay}
-
if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
-
}); err != nil {
-
logger.Error("error requesting crawl", "err", err)
-
} else {
-
logger.Info("crawl requested successfully")
-
}
}
-
s.lastRequestCrawl = time.Now()
return nil
}
···
passport *identity.Passport
fallbackProxy string
dbName string
s3Config *S3Config
}
···
go s.backupRoutine()
for _, relay := range s.config.Relays {
cli := xrpc.Client{Host: relay}
+
atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
+
})
}
+
<-ctx.Done()
+
+
fmt.Println("shut down")
return nil
}