an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os/signal" 8 "syscall" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/backfill" 13 "github.com/bluesky-social/indigo/events" 14 "github.com/bluesky-social/indigo/events/schedulers/parallel" 15 "github.com/gorilla/websocket" 16 "gorm.io/gorm" 17) 18 19type App struct { 20 backfill *backfill.Backfiller 21 cursor *CursorService 22 handler *HandlerService 23 census *CensusService 24 wsconn *websocket.Conn 25 state *gorm.DB 26 content *gorm.DB 27} 28 29func NewApp() *App { 30 stateDatabase := NewDatabase("state.db") 31 stateDatabase.AutoMigrate(&backfill.GormDBJob{}) 32 33 contentDatabase := NewDatabase("content.db") 34 35 return &App{ 36 state: stateDatabase, 37 content: contentDatabase, 38 } 39} 40 41func (app *App) Start(ctx context.Context) error { 42 app.cursor = NewCursorService(app.state) 43 go app.cursor.CheckpointCursors(ctx) 44 45 app.handler = NewHandlerService(app.content) 46 47 app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler) 48 go app.backfill.Start() 49 50 app.census = NewCensusService(app.cursor, app.backfill) 51 go app.census.Start(ctx) 52 53 wsconn, err := NewFirehoseConnection(ctx, app.cursor) 54 if err != nil { 55 return fmt.Errorf("error connecting to relay: %w", err) 56 } 57 app.wsconn = wsconn 58 59 rsc := events.RepoStreamCallbacks{ 60 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 61 app.cursor.SetFirehoseCursor(evt.Seq) 62 return app.backfill.HandleEvent(ctx, evt) 63 }, 64 // TODO account 65 // TODO identity 66 } 67 68 sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler) 69 70 if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil { 71 return fmt.Errorf("error starting repo stream handler: %w", err) 72 } 73 74 return nil 75} 76 77func (app *App) Stop(ctx context.Context) error { 78 closeDatabase := func(db *gorm.DB) error { 79 raw, err := db.DB() 80 if err != nil { 81 return fmt.Errorf("error getting raw DB: %w", err) 82 } 83 if err := raw.Close(); err != nil { 84 return fmt.Errorf("error closing DB: %w", err) 85 } 86 return nil 87 } 88 89 app.backfill.Stop(ctx) 90 91 closeDatabase(app.state) 92 closeDatabase(app.content) 93 94 return nil 95} 96 97func main() { 98 ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM) 99 defer cancel() 100 101 app := NewApp() 102 if err := app.Start(ctx); err != nil { 103 slog.Error("failed to start backfiller", "err", err) 104 } 105 106 <-ctx.Done() 107 slog.Info("shutting down") 108 109 endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15) 110 defer cancel() 111 112 if err := app.Stop(endctx); err != nil { 113 slog.Error("error during shutdown", "err", err) 114 } 115}