an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "gorm.io/gorm" 11) 12 13type cursorRecord struct { 14 ID uint `gorm:"primaryKey"` 15 Key string `gorm:"unique"` 16 Val string 17} 18 19func (b *Backend) LoadCursor(key string) (string, error) { 20 var rec cursorRecord 21 if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil { 22 if errors.Is(err, gorm.ErrRecordNotFound) { 23 b.state.Create(&cursorRecord{Key: key, Val: ""}) 24 } 25 return "", err 26 } 27 return rec.Val, nil 28} 29 30func (b *Backend) FlushCursors() error { 31 sl := slog.With("source", "flushCursors") 32 33 b.firehoseLk.Lock() 34 sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq) 35 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil { 36 return fmt.Errorf("failed to persist firehose cursor: %w", err) 37 } 38 b.firehoseLk.Unlock() 39 40 b.reposLk.Lock() 41 sl.Info("persisting repos cursor", "cursor", b.reposSeq) 42 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil { 43 return fmt.Errorf("failed to persist repos cursor: %w", err) 44 } 45 b.reposLk.Unlock() 46 47 return nil 48} 49 50func (b *Backend) SyncCursors(ctx context.Context) error { 51 for range time.Tick(time.Second * 5) { 52 select { 53 case <-ctx.Done(): 54 return nil 55 default: 56 // 57 } 58 59 if err := b.FlushCursors(); err != nil { 60 slog.Error("failed to flush cursors", "err", err) 61 return fmt.Errorf("failed to flush cursors: %w", err) 62 } 63 } 64 65 return nil 66}