an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 "gorm.io/gorm"
11)
12
13type cursorRecord struct {
14 ID uint `gorm:"primaryKey"`
15 Key string
16 Val string
17}
18
19type CursorService struct {
20 store *gorm.DB
21
22 firehoseLk sync.Mutex
23 firehoseSeq int64
24
25 reposLk sync.Mutex
26 reposSeq string
27}
28
29func NewCursorService(store *gorm.DB) *CursorService {
30 store.AutoMigrate(&cursorRecord{})
31
32 var rec cursorRecord
33 store.First(&rec, 1)
34 if rec.ID == 0 {
35 store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""})
36 }
37
38 store.First(&rec, 2)
39 if rec.ID == 0 {
40 store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""})
41 }
42
43 return &CursorService{
44 store: store,
45 }
46}
47
48func (cs *CursorService) Get(key string) (string, error) {
49 var rec cursorRecord
50 if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil {
51 return "", fmt.Errorf("error fetching cursor record: %w", err)
52 }
53 return rec.Val, nil
54}
55
56func (cs *CursorService) SetFirehoseCursor(seq int64) {
57 cs.firehoseLk.Lock()
58 cs.firehoseSeq = seq
59 cs.firehoseLk.Unlock()
60}
61
62func (cs *CursorService) SetReposCursor(value string) {
63 cs.reposLk.Lock()
64 cs.reposSeq = value
65 cs.reposLk.Unlock()
66}
67
68func (cs *CursorService) Flush() error {
69 cs.firehoseLk.Lock()
70 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", cs.firehoseSeq).Error; err != nil {
71 return fmt.Errorf("error updating cursor record: %w", err)
72 }
73 cs.firehoseLk.Unlock()
74
75 cs.reposLk.Lock()
76 if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", cs.reposSeq).Error; err != nil {
77 return fmt.Errorf("error updating cursor record: %w", err)
78 }
79 cs.reposLk.Unlock()
80
81 return nil
82}
83
84func (cs *CursorService) CheckpointCursors(ctx context.Context) {
85 t := time.NewTicker(time.Second * 5)
86 defer t.Stop()
87
88 for {
89 select {
90 case <-ctx.Done():
91 slog.Info("stopping cursor checkpointer")
92 return
93 case <-t.C:
94 }
95
96 slog.Info("flushing cursors")
97 if err := cs.Flush(); err != nil {
98 slog.Error("error flushing cursors", "err", err)
99 return
100 }
101 }
102}