An atproto PDS written in Go

request crawl when websocket dies (#31)

Changed files
+67 -16
server
+31 -11
server/handle_sync_subscribe_repos.go
···
package server
import (
-
"fmt"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
return err
}
-
-
s.logger.Info("new connection", "ua", e.Request().UserAgent())
-
-
ctx := e.Request().Context()
ident := e.RealIP() + "-" + e.Request().UserAgent()
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
-
return err
}
-
var obj util.CBOR
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
-
return fmt.Errorf("unrecognized event kind")
}
if err := header.MarshalCBOR(wc); err != nil {
-
return fmt.Errorf("failed to write header: %w", err)
}
if err := obj.MarshalCBOR(wc); err != nil {
-
return fmt.Errorf("failed to write event: %w", err)
}
if err := wc.Close(); err != nil {
-
return fmt.Errorf("failed to flush-close our event write: %w", err)
}
}
return nil
···
package server
import (
+
"context"
+
"time"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/lex/util"
···
)
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
+
ctx := e.Request().Context()
+
logger := s.logger.With("component", "subscribe-repos-websocket")
+
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
if err != nil {
+
logger.Error("unable to establish websocket with relay", "err", err)
return err
}
ident := e.RealIP() + "-" + e.Request().UserAgent()
+
logger = logger.With("ident", ident)
+
logger.Info("new connection established")
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
return true
···
for evt := range evts {
wc, err := conn.NextWriter(websocket.BinaryMessage)
if err != nil {
+
logger.Error("error writing message to relay", "err", err)
+
break
}
+
if ctx.Err() != nil {
+
logger.Error("context error", "err", err)
+
break
+
}
+
var obj util.CBOR
switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
···
header.MsgType = "#info"
obj = evt.RepoInfo
default:
+
logger.Warn("unrecognized event kind")
+
return nil
}
if err := header.MarshalCBOR(wc); err != nil {
+
logger.Error("failed to write header to relay", "err", err)
+
break
}
if err := obj.MarshalCBOR(wc); err != nil {
+
logger.Error("failed to write event to relay", "err", err)
+
break
}
if err := wc.Close(); err != nil {
+
logger.Error("failed to flush-close our event write", "err", err)
+
break
}
+
}
+
+
// we should tell the relay to request a new crawl at this point if we got disconnected
+
// use a new context since the old one might be cancelled at this point
+
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
+
defer cancel()
+
if err := s.requestCrawl(ctx); err != nil {
+
logger.Error("error requesting crawls", "err", err)
}
return nil
+36 -5
server/server.go
···
passport *identity.Passport
fallbackProxy string
dbName string
s3Config *S3Config
}
···
go s.backupRoutine()
for _, relay := range s.config.Relays {
cli := xrpc.Client{Host: relay}
-
atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
-
})
}
-
<-ctx.Done()
-
-
fmt.Println("shut down")
return nil
}
···
passport *identity.Passport
fallbackProxy string
+
lastRequestCrawl time.Time
+
requestCrawlMu sync.Mutex
+
dbName string
s3Config *S3Config
}
···
go s.backupRoutine()
+
go func() {
+
if err := s.requestCrawl(ctx); err != nil {
+
s.logger.Error("error requesting crawls", "err", err)
+
}
+
}()
+
+
<-ctx.Done()
+
+
fmt.Println("shut down")
+
+
return nil
+
}
+
+
func (s *Server) requestCrawl(ctx context.Context) error {
+
logger := s.logger.With("component", "request-crawl")
+
s.requestCrawlMu.Lock()
+
defer s.requestCrawlMu.Unlock()
+
+
logger.Info("requesting crawl with configured relays")
+
+
if time.Now().Sub(s.lastRequestCrawl) <= 1*time.Minute {
+
return fmt.Errorf("a crawl request has already been made within the last minute")
+
}
+
for _, relay := range s.config.Relays {
+
logger := logger.With("relay", relay)
+
logger.Info("requesting crawl from relay")
cli := xrpc.Client{Host: relay}
+
if err := atproto.SyncRequestCrawl(ctx, &cli, &atproto.SyncRequestCrawl_Input{
Hostname: s.config.Hostname,
+
}); err != nil {
+
logger.Error("error requesting crawl", "err", err)
+
} else {
+
logger.Info("crawl requested successfully")
+
}
}
+
s.lastRequestCrawl = time.Now()
return nil
}