an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 "gorm.io/gorm" 11) 12 13type cursorRecord struct { 14 ID uint `gorm:"primaryKey"` 15 Key string 16 Val string 17} 18 19type CursorService struct { 20 store *gorm.DB 21 22 firehoseLk sync.Mutex 23 firehoseSeq int64 24 25 reposLk sync.Mutex 26 reposSeq string 27} 28 29func NewCursorService(store *gorm.DB) *CursorService { 30 store.AutoMigrate(&cursorRecord{}) 31 32 var rec cursorRecord 33 store.First(&rec, 1) 34 if rec.ID == 0 { 35 store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""}) 36 } 37 38 store.First(&rec, 2) 39 if rec.ID == 0 { 40 store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""}) 41 } 42 43 return &CursorService{ 44 store: store, 45 } 46} 47 48func (cs *CursorService) Get(key string) (string, error) { 49 var rec cursorRecord 50 if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil { 51 return "", fmt.Errorf("error fetching cursor record: %w", err) 52 } 53 return rec.Val, nil 54} 55 56func (cs *CursorService) SetFirehoseCursor(seq int64) { 57 cs.firehoseLk.Lock() 58 cs.firehoseSeq = seq 59 cs.firehoseLk.Unlock() 60} 61 62func (cs *CursorService) SetReposCursor(value string) { 63 cs.reposLk.Lock() 64 cs.reposSeq = value 65 cs.reposLk.Unlock() 66} 67 68func (cs *CursorService) Flush() error { 69 cs.firehoseLk.Lock() 70 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", cs.firehoseSeq).Error; err != nil { 71 return fmt.Errorf("error updating cursor record: %w", err) 72 } 73 cs.firehoseLk.Unlock() 74 75 cs.reposLk.Lock() 76 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", cs.reposSeq).Error; err != nil { 77 return fmt.Errorf("error updating cursor record: %w", err) 78 } 79 cs.reposLk.Unlock() 80 81 return nil 82} 83 84func (cs *CursorService) CheckpointCursors(ctx context.Context) { 85 t := time.NewTicker(time.Second * 5) 86 defer t.Stop() 87 88 for { 89 select { 90 case <-ctx.Done(): 91 slog.Info("stopping cursor checkpointer") 92 return 93 case <-t.C: 94 } 95 96 slog.Info("flushing cursors") 97 if err := cs.Flush(); err != nil { 98 slog.Error("error flushing cursors", "err", err) 99 return 100 } 101 } 102}