an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log/slog" 6 "sync" 7 "time" 8 9 "gorm.io/gorm" 10) 11 12type CursorService struct { 13 store *gorm.DB 14 15 firehoseLk sync.Mutex 16 firehoseSeq int64 17} 18 19func NewCursorService(store *gorm.DB) *CursorService { 20 store.AutoMigrate(&firehoseCursor{}) 21 store.AutoMigrate(&hostCursor{}) 22 23 return &CursorService{ 24 store: store, 25 } 26} 27 28func (cs *CursorService) Checkpoint(ctx context.Context) { 29 t := time.NewTicker(time.Second * 5) 30 defer t.Stop() 31 32 for { 33 select { 34 case <-ctx.Done(): 35 slog.Info("stopping cursor checkpointer", "err", ctx.Err()) 36 return 37 case <-t.C: 38 } 39 40 slog.Info("persisting firehose cursor", "seq", cs.firehoseSeq) 41 if err := cs.PersistFirehoseCursor(); err != nil { 42 slog.Error("error persisting firehose cursor", "err", err) 43 return 44 } 45 } 46}