package main import ( "context" "fmt" "log/slog" "os/signal" "syscall" "time" comatproto "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/parallel" "github.com/gorilla/websocket" "gorm.io/gorm" ) type App struct { backfill *backfill.Backfiller cursor *CursorService handler *HandlerService census *CensusService wsconn *websocket.Conn state *gorm.DB content *gorm.DB } func NewApp() *App { stateDatabase := NewDatabase("state.db") stateDatabase.AutoMigrate(&backfill.GormDBJob{}) contentDatabase := NewDatabase("content.db") return &App{ state: stateDatabase, content: contentDatabase, } } func (app *App) Start(ctx context.Context) error { app.cursor = NewCursorService(app.state) go app.cursor.CheckpointCursors(ctx) app.handler = NewHandlerService(app.content) app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler) go app.backfill.Start() app.census = NewCensusService(app.cursor, app.backfill) go app.census.Start(ctx) wsconn, err := NewFirehoseConnection(ctx, app.cursor) if err != nil { return fmt.Errorf("error connecting to relay: %w", err) } app.wsconn = wsconn rsc := events.RepoStreamCallbacks{ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { app.cursor.SetFirehoseCursor(evt.Seq) return app.backfill.HandleEvent(ctx, evt) }, // TODO account // TODO identity } sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler) if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil { return fmt.Errorf("error starting repo stream handler: %w", err) } return nil } func (app *App) Stop(ctx context.Context) error { closeDatabase := func(db *gorm.DB) error { raw, err := db.DB() if err != nil { return fmt.Errorf("error getting raw DB: %w", err) } if err := raw.Close(); err != nil { return fmt.Errorf("error closing DB: %w", err) } return nil } if err := closeDatabase(app.state); err != nil { return err } if err := closeDatabase(app.content); err != nil { return err } if err := app.backfill.Stop(ctx); err != nil { return err } return nil } func main() { ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM) defer cancel() app := NewApp() if err := app.Start(ctx); err != nil { slog.Error("failed to start backfiller", "err", err) } <-ctx.Done() slog.Info("shutting down") endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15) defer cancel() if err := app.Stop(endctx); err != nil { slog.Error("error during shutdown", "err", err) } }