an app.bsky.* indexer
1package main
2
3import (
4 "errors"
5 "fmt"
6 "log/slog"
7 "time"
8
9 "gorm.io/gorm"
10)
11
12type cursorRecord struct {
13 ID uint `gorm:"primaryKey"`
14 Key string `gorm:"unique"`
15 Val string
16}
17
18func (b *Backend) LoadCursor(key string) (string, error) {
19 var rec cursorRecord
20 if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil {
21 if errors.Is(err, gorm.ErrRecordNotFound) {
22 b.state.Create(&cursorRecord{Key: key, Val: ""})
23 }
24 return "", err
25 }
26 return rec.Val, nil
27}
28
29func (b *Backend) FlushCursors() error {
30 sl := slog.With("source", "flushCursors")
31
32 b.firehoseLk.Lock()
33 sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq)
34 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil {
35 return fmt.Errorf("failed to persist firehose cursor: %w", err)
36 }
37 b.firehoseLk.Unlock()
38
39 b.reposLk.Lock()
40 sl.Info("persisting repos cursor", "cursor", b.reposSeq)
41 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil {
42 return fmt.Errorf("failed to persist repos cursor: %w", err)
43 }
44 b.reposLk.Unlock()
45
46 return nil
47}
48
49func (b *Backend) SyncCursors() {
50 for range time.Tick(time.Second * 5) {
51 if err := b.FlushCursors(); err != nil {
52 slog.Error("failed to flush cursors", "err", err)
53 }
54 }
55}