an app.bsky.* indexer

cleanup stoppage

Changed files
+91 -40
cmd
+12 -1
cmd/backfiller/cursors.go
···
package main
import (
+
"context"
"errors"
"fmt"
"log/slog"
···
return nil
}
-
func (b *Backend) SyncCursors() {
+
func (b *Backend) SyncCursors(ctx context.Context) error {
for range time.Tick(time.Second * 5) {
+
select {
+
case <-ctx.Done():
+
return nil
+
default:
+
//
+
}
+
if err := b.FlushCursors(); err != nil {
slog.Error("failed to flush cursors", "err", err)
+
return fmt.Errorf("failed to flush cursors: %w", err)
}
}
+
+
return nil
}
+7
cmd/backfiller/handlers.go
···
)
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
+
select {
+
case <-ctx.Done():
+
return nil
+
default:
+
//
+
}
+
b.firehoseLk.Lock()
b.firehoseSeq = strconv.Itoa(int(evt.Seq))
b.firehoseLk.Unlock()
+66 -24
cmd/backfiller/main.go
···
func NewDatabase() *gorm.DB {
sl := slog.With("source", "database")
-
newLogger := logger.New(
-
log.New(os.Stdout, "\n", log.LstdFlags),
+
log.New(os.Stdout, "\r\n", log.LstdFlags),
logger.Config{
SlowThreshold: 1 * time.Second,
Colorful: false,
···
}
db.AutoMigrate(&backfill.GormDBJob{})
db.AutoMigrate(&cursorRecord{})
+
return db
}
···
// SyncRequestsPerSecond: 25,
ParallelBackfills: 10,
-
ParallelRecordCreates: 5,
+
ParallelRecordCreates: 1, // sqlite
SyncRequestsPerSecond: 5,
RelayHost: "https://bsky.network",
}
+
return backfill.NewBackfiller(
"backfills",
backfill.NewGormstore(db),
···
)
}
-
func NewFirehose(backend *Backend) *websocket.Conn {
+
func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
sl := slog.With("source", "firehose")
-
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
-
-
cursor, err := backend.LoadCursor("firehose")
-
if err != nil {
-
sl.Error("failed loading firehose cursor", "err", err)
-
}
if cursor != "" {
subscribeUrl += "?cursor=" + cursor
}
-
conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
})
if err != nil {
sl.Error("failed to connect to relay", "err", err)
}
+
return conn
}
···
return backend.RepoCommitHandler(ctx, evt)
},
}
+
return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
}
func main() {
-
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
defer stop()
+
sl := slog.With("source", "backfiller")
-
sl := slog.With("source", "backfiller")
+
streamClosed := make(chan struct{})
+
streamCtx, streamCancel := context.WithCancel(context.Background())
db := NewDatabase()
···
go bf.Start()
backend := NewBackend(db, bf)
-
go backend.SyncCursors()
-
go backend.PumpRepos(ctx)
+
go backend.SyncCursors(streamCtx)
-
conn := NewFirehose(backend)
-
sched := NewScheduler(ctx, backend)
-
if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil {
-
sl.Error("failed to start scheduler", "err", err)
+
cursor, err := backend.LoadCursor("firehose")
+
if err != nil {
+
sl.Error("failed loading firehose cursor", "err", err)
}
-
<-ctx.Done()
+
conn := NewFirehose(streamCtx, cursor)
-
endctx, cancel := context.WithTimeout(ctx, time.Minute)
-
defer cancel()
-
bf.Stop(endctx)
+
sched := NewScheduler(streamCtx, backend)
+
go func() {
+
if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
+
sl.Error("failed to start scheduler", "err", err)
+
}
+
close(streamClosed)
+
}()
+
+
go func() {
+
if err := backend.PumpRepos(streamCtx); err != nil {
+
sl.Error("failed pumping repos", "err", err)
+
} else {
+
sl.Info("finished listing repos, switching over to event stream")
+
}
+
}()
+
+
quit := make(chan struct{})
+
exitSignals := make(chan os.Signal, 1)
+
signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
+
go func() {
+
select {
+
case sig := <-exitSignals:
+
sl.Info("received OS exit signal", "signal", sig)
+
case <-streamClosed:
+
//
+
}
+
+
conn.Close()
+
+
streamCancel()
+
<-streamClosed
+
+
time.Sleep(time.Millisecond * 100)
+
+
endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
+
defer cancel()
+
bf.Stop(endctx)
+
+
if err := backend.FlushCursors(); err != nil {
+
sl.Error("final flush cursor failed", "err", err)
+
}
+
+
close(quit)
+
}()
+
+
<-quit
+
+
if err := backend.FlushCursors(); err != nil {
+
sl.Error("failed to flush cursors on close", "err", err)
+
}
}
+6 -15
cmd/backfiller/pump.go
···
func (b *Backend) PumpRepos(ctx context.Context) error {
sl := slog.With("source", "pumpRepos")
-
-
if err := pumpRepos(ctx, b); err != nil {
-
sl.Error("failed pumping repos", "err", err)
-
}
-
-
return nil
-
}
-
-
func pumpRepos(ctx context.Context, backend *Backend) error {
-
sl := slog.With("source", "pumpRepos")
-
bf := backend.bf
+
bf := b.bf
xrpcc := &xrpc.Client{
Host: "https://bsky.network",
···
return fmt.Errorf("configured job store doesn't support random job creation")
}
-
curs, err := backend.LoadCursor("repos")
+
curs, err := b.LoadCursor("repos")
if err != nil {
sl.Error("failed to load repos cursor", "err", err)
}
···
sl.Info("stopping repo pump")
return nil
default:
+
//
}
sl.Info("listing repos", "cursor", curs)
···
if res.Cursor != nil && *res.Cursor != "" {
curs = *res.Cursor
-
backend.reposLk.Lock()
-
backend.reposSeq = curs
-
backend.reposLk.Unlock()
+
b.reposLk.Lock()
+
b.reposSeq = curs
+
b.reposLk.Unlock()
} else {
break
}