package main import ( "context" "log/slog" "net/http" "os" "os/signal" "syscall" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/gorilla/websocket" "gorm.io/gorm" ) func NewBackfiller( db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete, ) *backfill.Backfiller { opts := &backfill.BackfillOptions{ // ParallelBackfills: 50, // ParallelRecordCreates: 25, // SyncRequestsPerSecond: 25, ParallelBackfills: 10, ParallelRecordCreates: 1, // sqlite SyncRequestsPerSecond: 5, // NSIDFilter: "app.bsky.feed.generator", RelayHost: "https://bsky.network", } return backfill.NewBackfiller( "backfills", backfill.NewGormstore(db), create, update, delete, opts, ) } func NewFirehose(ctx context.Context, cursor string) *websocket.Conn { sl := slog.With("source", "firehose") subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" if cursor != "" { subscribeUrl += "?cursor=" + cursor } conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{ "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, }) if err != nil { sl.Error("failed to connect to relay", "err", err) } return conn } func NewScheduler( ctx context.Context, commitCallback commitHandler, ) *parallel.Scheduler { rsc := events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { return commitCallback(ctx, evt) }, } return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) } func main() { sl := slog.With("source", "backfiller") streamClosed := make(chan struct{}) streamCtx, streamCancel := context.WithCancel(context.Background()) stateDb := NewDatabase("state.db") stateDb.AutoMigrate(&backfill.GormDBJob{}) stateDb.AutoMigrate(&cursorRecord{}) contentDb := NewDatabase("database.db") contentDb.AutoMigrate(&FeedGenerator{}) backend := NewBackend(stateDb, contentDb) bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp) go bf.Start() // attach the backfiller to the backend so the curors, the pump, // and the commit callback can use it backend.bf = bf go backend.SyncCursors(streamCtx) cursor, err := backend.LoadCursor("firehose") if err != nil { sl.Error("failed loading firehose cursor", "err", err) } conn := NewFirehose(streamCtx, cursor) sched := NewScheduler(streamCtx, backend.RepoCommitHandler) go func() { if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil { sl.Error("failed to start scheduler", "err", err) } close(streamClosed) }() go func() { if err := backend.PumpRepos(streamCtx); err != nil { sl.Error("failed pumping repos", "err", err) } }() quit := make(chan struct{}) exitSignals := make(chan os.Signal, 1) signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) go func() { select { case sig := <-exitSignals: sl.Info("received OS exit signal", "signal", sig) case <-streamClosed: // } conn.Close() streamCancel() <-streamClosed time.Sleep(time.Millisecond * 100) endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() bf.Stop(endctx) close(quit) }() <-quit sl.Info("flushing cursors") if err := backend.FlushCursors(); err != nil { sl.Error("failed to flush cursors on close", "err", err) } sl.Info("closing databases") if err := backend.CleanlyClose(); err != nil { sl.Error("failed to close databases", "err", err) } }