an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "strconv" 8 "sync" 9 "time" 10 11 "gorm.io/gorm" 12) 13 14type cursorRecord struct { 15 ID uint `gorm:"primaryKey"` 16 Key string 17 Val string 18} 19 20type CursorService struct { 21 store *gorm.DB 22 23 firehoseLk sync.Mutex 24 firehoseSeq string 25 26 reposLk sync.Mutex 27 reposSeq string 28} 29 30func NewCursorService(store *gorm.DB) *CursorService { 31 store.AutoMigrate(&cursorRecord{}) 32 33 var rec cursorRecord 34 store.First(&rec, 1) 35 if rec.ID == 0 { 36 store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""}) 37 } 38 39 store.First(&rec, 2) 40 if rec.ID == 0 { 41 store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""}) 42 } 43 44 return &CursorService{ 45 store: store, 46 } 47} 48 49func (cs *CursorService) Get(key string) (string, error) { 50 var rec cursorRecord 51 if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil { 52 return "", fmt.Errorf("error fetching cursor record: %w", err) 53 } 54 return rec.Val, nil 55} 56 57func (cs *CursorService) SetFirehoseCursor(seq int64) { 58 cs.firehoseLk.Lock() 59 val := strconv.Itoa(int(seq)) 60 cs.firehoseSeq = val 61 cs.firehoseLk.Unlock() 62} 63 64func (cs *CursorService) SetReposCursor(value string) { 65 cs.reposLk.Lock() 66 cs.reposSeq = value 67 cs.reposLk.Unlock() 68} 69 70func (cs *CursorService) Flush() error { 71 flusher := func(lk *sync.Mutex, key, value string) error { 72 lk.Lock() 73 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil { 74 return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err) 75 } 76 lk.Unlock() 77 return nil 78 } 79 80 if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil { 81 return err 82 } 83 84 if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil { 85 return err 86 } 87 88 return nil 89} 90 91func (cs *CursorService) CheckpointCursors(ctx context.Context) { 92 t := time.NewTicker(time.Second * 5) 93 defer t.Stop() 94 95 for { 96 select { 97 case <-ctx.Done(): 98 slog.Info("stopping cursor checkpointer") 99 return 100 case <-t.C: 101 } 102 103 slog.Info("flushing cursors") 104 if err := cs.Flush(); err != nil { 105 slog.Error("error flushing cursors", "err", err) 106 return 107 } 108 } 109}