an app.bsky.* indexer

persist firehose cursor

Changed files
+115 -24
cmd
+82
cmd/backfiller/backend.go
···
+
package main
+
+
import (
+
"context"
+
"errors"
+
"fmt"
+
"log/slog"
+
"sync"
+
"time"
+
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"gorm.io/gorm"
+
)
+
+
type Backend struct {
+
state *gorm.DB
+
bf *backfill.Backfiller
+
seqLk sync.Mutex
+
lastSeq int64
+
}
+
+
func NewBackend(state *gorm.DB, bf *backfill.Backfiller) *Backend {
+
return &Backend{
+
state: state,
+
bf: bf,
+
}
+
}
+
+
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
+
b.seqLk.Lock()
+
if evt.Seq > b.lastSeq {
+
b.lastSeq = evt.Seq
+
}
+
b.seqLk.Unlock()
+
+
job, err := b.bf.Store.GetJob(ctx, evt.Repo)
+
if job == nil {
+
if errors.Is(err, backfill.ErrJobNotFound) {
+
return nil
+
} else {
+
return fmt.Errorf("error getting job: %w", err)
+
}
+
} else {
+
return b.bf.HandleEvent(ctx, evt)
+
}
+
}
+
+
func (b *Backend) LoadCursor() (int, error) {
+
var rec cursorRecord
+
if err := b.state.Find(&rec, "id = 1").Error; err != nil {
+
return 0, err
+
}
+
+
if rec.ID == 0 {
+
if err := b.state.Create(&cursorRecord{ID: 1}).Error; err != nil {
+
return 0, err
+
}
+
}
+
+
return rec.Val, nil
+
}
+
+
func (b *Backend) FlushCursor() error {
+
b.seqLk.Lock()
+
v := b.lastSeq
+
b.seqLk.Unlock()
+
+
if err := b.state.Model(cursorRecord{}).Where("id = 1 and val < ?", v).Update("val", v).Error; err != nil {
+
return err
+
}
+
+
return nil
+
}
+
+
func (b *Backend) syncCursorRoutine() {
+
for range time.Tick(time.Second * 5) {
+
if err := b.FlushCursor(); err != nil {
+
slog.Error("failed to flush cursor", "err", err)
+
}
+
}
+
}
+33 -24
cmd/backfiller/backfiller.go
···
import (
"context"
-
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
+
"strconv"
"syscall"
"time"
"github.com/bluesky-social/indigo/api/atproto"
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
···
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
+
+
type cursorRecord struct {
+
ID uint `gorm:"primaryKey"`
+
Val int
+
}
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
return nil
···
newLogger := logger.New(
log.New(os.Stdout, "\n", log.LstdFlags),
logger.Config{
-
SlowThreshold: 5*time.Second,
-
Colorful: false,
+
SlowThreshold: 5 * time.Second,
+
Colorful: false,
},
)
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
···
sl.Error("failed to connect to database", "err", err)
}
db.AutoMigrate(&backfill.GormDBJob{})
+
db.AutoMigrate(&cursorRecord{})
store := backfill.NewGormstore(db)
-
// connect to the relay
-
con, _, err := websocket.DefaultDialer.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{
-
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
-
})
-
if err != nil {
-
sl.Error("failed to connect to relay", "err", err)
-
}
-
-
// start backfilling
+
// create and start the backfiller
opts := &backfill.BackfillOptions{
ParallelBackfills: 10,
ParallelRecordCreates: 100,
···
bf := backfill.NewBackfiller("backfills", store, handleCreate, handleUpdate, handleDelete, opts)
go bf.Start()
+
// set up the backend service
+
backend := NewBackend(db, bf)
+
go backend.syncCursorRoutine()
+
+
// connect to the relay
+
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
+
curs, err := backend.LoadCursor()
+
if err != nil {
+
sl.Error("error loading firehose cursor", "err", err)
+
}
+
if curs > 0 {
+
subscribeUrl += "?cursor=" + strconv.Itoa(curs)
+
}
+
+
con, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
+
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
+
})
+
if err != nil {
+
sl.Error("failed to connect to relay", "err", err)
+
}
+
// pump repos
go func(ctx context.Context, bf *backfill.Backfiller) {
if err := pumpRepos(ctx, bf); err != nil {
···
// read from the firehose
rsc := events.RepoStreamCallbacks{
-
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
-
job, err := bf.Store.GetJob(ctx, evt.Repo)
-
if job == nil {
-
if errors.Is(err, backfill.ErrJobNotFound) {
-
return nil
-
} else {
-
return fmt.Errorf("error getting job: %w", err)
-
}
-
} else {
-
return bf.HandleEvent(ctx, evt)
-
}
+
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
+
return backend.RepoCommitHandler(ctx, evt)
},
}
sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)