package main import ( "context" "log" "log/slog" "net/http" "os" "os/signal" "syscall" "time" "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/gorilla/websocket" "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/logger" ) func NewDatabase() *gorm.DB { sl := slog.With("source", "database") newLogger := logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{ SlowThreshold: 1 * time.Second, Colorful: false, }, ) db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ Logger: newLogger, }) if err != nil { sl.Error("failed to connect to database", "err", err) } db.AutoMigrate(&backfill.GormDBJob{}) db.AutoMigrate(&cursorRecord{}) return db } func NewBackfiller(db *gorm.DB) *backfill.Backfiller { opts := &backfill.BackfillOptions{ // ParallelBackfills: 50, // ParallelRecordCreates: 25, // SyncRequestsPerSecond: 25, ParallelBackfills: 10, ParallelRecordCreates: 1, // sqlite SyncRequestsPerSecond: 5, RelayHost: "https://bsky.network", } return backfill.NewBackfiller( "backfills", backfill.NewGormstore(db), handleCreate, handleUpdate, handleDelete, opts, ) } func NewFirehose(ctx context.Context, cursor string) *websocket.Conn { sl := slog.With("source", "firehose") subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" if cursor != "" { subscribeUrl += "?cursor=" + cursor } conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{ "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, }) if err != nil { sl.Error("failed to connect to relay", "err", err) } return conn } func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler { rsc := events.RepoStreamCallbacks{ RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { return backend.RepoCommitHandler(ctx, evt) }, } return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) } func main() { sl := slog.With("source", "backfiller") streamClosed := make(chan struct{}) streamCtx, streamCancel := context.WithCancel(context.Background()) db := NewDatabase() bf := NewBackfiller(db) go bf.Start() backend := NewBackend(db, bf) go backend.SyncCursors(streamCtx) cursor, err := backend.LoadCursor("firehose") if err != nil { sl.Error("failed loading firehose cursor", "err", err) } conn := NewFirehose(streamCtx, cursor) sched := NewScheduler(streamCtx, backend) go func() { if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil { sl.Error("failed to start scheduler", "err", err) } close(streamClosed) }() go func() { if err := backend.PumpRepos(streamCtx); err != nil { sl.Error("failed pumping repos", "err", err) } else { sl.Info("finished listing repos, switching over to event stream") } }() quit := make(chan struct{}) exitSignals := make(chan os.Signal, 1) signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) go func() { select { case sig := <-exitSignals: sl.Info("received OS exit signal", "signal", sig) case <-streamClosed: // } conn.Close() streamCancel() <-streamClosed time.Sleep(time.Millisecond * 100) endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) defer cancel() bf.Stop(endctx) if err := backend.FlushCursors(); err != nil { sl.Error("final flush cursor failed", "err", err) } close(quit) }() <-quit if err := backend.FlushCursors(); err != nil { sl.Error("failed to flush cursors on close", "err", err) } }