an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log/slog"
6 "sync"
7 "time"
8
9 "gorm.io/gorm"
10)
11
12type CursorService struct {
13 store *gorm.DB
14
15 firehoseLk sync.Mutex
16 firehoseSeq int64
17}
18
19func NewCursorService(store *gorm.DB) *CursorService {
20 store.AutoMigrate(&firehoseCursor{})
21 store.AutoMigrate(&hostCursor{})
22
23 return &CursorService{
24 store: store,
25 }
26}
27
28func (cs *CursorService) Checkpoint(ctx context.Context) {
29 t := time.NewTicker(time.Second * 5)
30 defer t.Stop()
31
32 for {
33 select {
34 case <-ctx.Done():
35 slog.Info("stopping cursor checkpointer", "err", ctx.Err())
36 return
37 case <-t.C:
38 }
39
40 slog.Info("persisting firehose cursor", "seq", cs.firehoseSeq)
41 if err := cs.PersistFirehoseCursor(); err != nil {
42 slog.Error("error persisting firehose cursor", "err", err)
43 return
44 }
45 }
46}