an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strconv"
8 "sync"
9 "time"
10
11 "gorm.io/gorm"
12)
13
14type cursorRecord struct {
15 ID uint `gorm:"primaryKey"`
16 Key string
17 Val string
18}
19
20type CursorService struct {
21 store *gorm.DB
22
23 firehoseLk sync.Mutex
24 firehoseSeq string
25
26 reposLk sync.Mutex
27 reposSeq string
28}
29
30func NewCursorService(store *gorm.DB) *CursorService {
31 store.AutoMigrate(&cursorRecord{})
32
33 var rec cursorRecord
34 store.First(&rec, 1)
35 if rec.ID == 0 {
36 store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""})
37 }
38
39 store.First(&rec, 2)
40 if rec.ID == 0 {
41 store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""})
42 }
43
44 return &CursorService{
45 store: store,
46 }
47}
48
49func (cs *CursorService) Get(key string) (string, error) {
50 var rec cursorRecord
51 if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil {
52 return "", fmt.Errorf("error fetching cursor record: %w", err)
53 }
54 return rec.Val, nil
55}
56
57func (cs *CursorService) SetFirehoseCursor(seq int64) {
58 cs.firehoseLk.Lock()
59 val := strconv.Itoa(int(seq))
60 cs.firehoseSeq = val
61 cs.firehoseLk.Unlock()
62}
63
64func (cs *CursorService) SetReposCursor(value string) {
65 cs.reposLk.Lock()
66 cs.reposSeq = value
67 cs.reposLk.Unlock()
68}
69
70func (cs *CursorService) Flush() error {
71 flusher := func(lk *sync.Mutex, key, value string) error {
72 lk.Lock()
73 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil {
74 return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err)
75 }
76 lk.Unlock()
77 return nil
78 }
79
80 if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil {
81 return err
82 }
83
84 if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil {
85 return err
86 }
87
88 return nil
89}
90
91func (cs *CursorService) CheckpointCursors(ctx context.Context) {
92 t := time.NewTicker(time.Second * 5)
93 defer t.Stop()
94
95 for {
96 select {
97 case <-ctx.Done():
98 slog.Info("stopping cursor checkpointer")
99 return
100 case <-t.C:
101 }
102
103 slog.Info("flushing cursors")
104 if err := cs.Flush(); err != nil {
105 slog.Error("error flushing cursors", "err", err)
106 return
107 }
108 }
109}