an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "gorm.io/gorm"
11)
12
13type cursorRecord struct {
14 ID uint `gorm:"primaryKey"`
15 Key string `gorm:"unique"`
16 Val string
17}
18
19func (b *Backend) LoadCursor(key string) (string, error) {
20 var rec cursorRecord
21 if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil {
22 if errors.Is(err, gorm.ErrRecordNotFound) {
23 b.state.Create(&cursorRecord{Key: key, Val: ""})
24 }
25 return "", err
26 }
27 return rec.Val, nil
28}
29
30func (b *Backend) FlushCursors() error {
31 sl := slog.With("source", "flushCursors")
32
33 b.firehoseLk.Lock()
34 sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq)
35 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil {
36 return fmt.Errorf("failed to persist firehose cursor: %w", err)
37 }
38 b.firehoseLk.Unlock()
39
40 b.reposLk.Lock()
41 sl.Info("persisting repos cursor", "cursor", b.reposSeq)
42 if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil {
43 return fmt.Errorf("failed to persist repos cursor: %w", err)
44 }
45 b.reposLk.Unlock()
46
47 return nil
48}
49
50func (b *Backend) SyncCursors(ctx context.Context) error {
51 for range time.Tick(time.Second * 5) {
52 select {
53 case <-ctx.Done():
54 return nil
55 default:
56 //
57 }
58
59 if err := b.FlushCursors(); err != nil {
60 slog.Error("failed to flush cursors", "err", err)
61 return fmt.Errorf("failed to flush cursors: %w", err)
62 }
63 }
64
65 return nil
66}