an app.bsky.* indexer
1package main 2 3import ( 4 "errors" 5 "fmt" 6 "log/slog" 7 "time" 8 9 "gorm.io/gorm" 10) 11 12type cursorRecord struct { 13 ID uint `gorm:"primaryKey"` 14 Key string `gorm:"unique"` 15 Val string 16} 17 18func (b *Backend) LoadCursor(key string) (string, error) { 19 var rec cursorRecord 20 if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil { 21 if errors.Is(err, gorm.ErrRecordNotFound) { 22 b.state.Create(&cursorRecord{Key: key, Val: ""}) 23 } 24 return "", err 25 } 26 return rec.Val, nil 27} 28 29func (b *Backend) FlushCursors() error { 30 sl := slog.With("source", "flushCursors") 31 32 b.firehoseLk.Lock() 33 sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq) 34 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil { 35 return fmt.Errorf("failed to persist firehose cursor: %w", err) 36 } 37 b.firehoseLk.Unlock() 38 39 b.reposLk.Lock() 40 sl.Info("persisting repos cursor", "cursor", b.reposSeq) 41 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil { 42 return fmt.Errorf("failed to persist repos cursor: %w", err) 43 } 44 b.reposLk.Unlock() 45 46 return nil 47} 48 49func (b *Backend) SyncCursors() { 50 for range time.Tick(time.Second * 5) { 51 if err := b.FlushCursors(); err != nil { 52 slog.Error("failed to flush cursors", "err", err) 53 } 54 } 55}