an app.bsky.* indexer

cursor persist

Changed files
+38 -28
cmd
+1 -1
cmd/backfiller/backend.go
···
bf *backfill.Backfiller
firehoseLk sync.Mutex
-
firehoseSeq int64
+
firehoseSeq string
reposLk sync.Mutex
reposSeq string
}
+19 -15
cmd/backfiller/cursors.go
···
package main
import (
+
"errors"
+
"fmt"
"log/slog"
"time"
+
+
"gorm.io/gorm"
)
type cursorRecord struct {
ID uint `gorm:"primaryKey"`
-
Key string
-
Val any
+
Key string `gorm:"unique"`
+
Val string
}
-
func (b *Backend) LoadCursor(key string) (any, error) {
+
func (b *Backend) LoadCursor(key string) (string, error) {
var rec cursorRecord
-
if err := b.state.Find(&rec, "key = ?", key).Error; err != nil {
-
return nil, err
-
}
-
-
if rec.ID == 0 {
-
if err := b.state.Create(&cursorRecord{Key: key}).Error; err != nil {
-
return nil, err
+
if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil {
+
if errors.Is(err, gorm.ErrRecordNotFound) {
+
b.state.Create(&cursorRecord{Key: key, Val: ""})
}
+
return "", err
}
-
return rec.Val, nil
}
func (b *Backend) FlushCursors() error {
+
sl := slog.With("source", "flushCursors")
+
b.firehoseLk.Lock()
-
if err := b.state.Model(cursorRecord{}).Where("key = 'firehose'").Update("val", b.firehoseSeq).Error; err != nil {
-
return err
+
sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq)
+
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil {
+
return fmt.Errorf("failed to persist firehose cursor: %w", err)
}
b.firehoseLk.Unlock()
b.reposLk.Lock()
-
if err := b.state.Model(cursorRecord{}).Where("key = 'repos'").Update("val", b.reposSeq).Error; err != nil {
-
return err
+
sl.Info("persisting repos cursor", "cursor", b.reposSeq)
+
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil {
+
return fmt.Errorf("failed to persist repos cursor: %w", err)
}
b.reposLk.Unlock()
+2 -1
cmd/backfiller/handlers.go
···
"context"
"errors"
"fmt"
+
"strconv"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
···
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
b.firehoseLk.Lock()
-
b.firehoseSeq = evt.Seq
+
b.firehoseSeq = strconv.Itoa(int(evt.Seq))
b.firehoseLk.Unlock()
job, err := b.bf.Store.GetJob(ctx, evt.Repo)
+11 -8
cmd/backfiller/main.go
···
import (
"context"
-
"fmt"
"log"
"log/slog"
"net/http"
···
func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
opts := &backfill.BackfillOptions{
-
ParallelBackfills: 50,
-
ParallelRecordCreates: 25,
-
SyncRequestsPerSecond: 25,
-
RelayHost: "https://bsky.network",
+
// ParallelBackfills: 50,
+
// ParallelRecordCreates: 25,
+
// SyncRequestsPerSecond: 25,
+
+
ParallelBackfills: 10,
+
ParallelRecordCreates: 5,
+
SyncRequestsPerSecond: 5,
+
+
RelayHost: "https://bsky.network",
}
return backfill.NewBackfiller(
"backfills",
···
if err != nil {
sl.Error("failed loading firehose cursor", "err", err)
}
-
cc, ok := cursor.(int64)
-
if ok {
-
subscribeUrl += fmt.Sprintf("?cursor=%d", cc)
+
if cursor != "" {
+
subscribeUrl += "?cursor=" + cursor
}
conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
+5 -3
cmd/backfiller/pump.go
···
return fmt.Errorf("configured job store doesn't support random job creation")
}
-
cursor, err := backend.LoadCursor("repos")
+
curs, err := backend.LoadCursor("repos")
if err != nil {
sl.Error("failed to load repos cursor", "err", err)
}
-
curs, _ := cursor.(string)
for {
select {
···
}
if res.Cursor != nil && *res.Cursor != "" {
-
cursor = *res.Cursor
+
curs = *res.Cursor
+
backend.reposLk.Lock()
+
backend.reposSeq = curs
+
backend.reposLk.Unlock()
} else {
break
}