package main import ( "errors" "fmt" "log/slog" "time" "gorm.io/gorm" ) type cursorRecord struct { ID uint `gorm:"primaryKey"` Key string `gorm:"unique"` Val string } func (b *Backend) LoadCursor(key string) (string, error) { var rec cursorRecord if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { b.state.Create(&cursorRecord{Key: key, Val: ""}) } return "", err } return rec.Val, nil } func (b *Backend) FlushCursors() error { sl := slog.With("source", "flushCursors") b.firehoseLk.Lock() sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq) if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil { return fmt.Errorf("failed to persist firehose cursor: %w", err) } b.firehoseLk.Unlock() b.reposLk.Lock() sl.Info("persisting repos cursor", "cursor", b.reposSeq) if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil { return fmt.Errorf("failed to persist repos cursor: %w", err) } b.reposLk.Unlock() return nil } func (b *Backend) SyncCursors() { for range time.Tick(time.Second * 5) { if err := b.FlushCursors(); err != nil { slog.Error("failed to flush cursors", "err", err) } } }