an app.bsky.* indexer

reorg

Changed files
+291 -232
cmd
+7 -64
cmd/backfiller/backend.go
···
package main
import (
-
"context"
-
"errors"
-
"fmt"
-
"log/slog"
"sync"
-
"time"
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"gorm.io/gorm"
)
type Backend struct {
-
state *gorm.DB
-
bf *backfill.Backfiller
-
seqLk sync.Mutex
-
lastSeq int64
+
state *gorm.DB
+
bf *backfill.Backfiller
+
+
firehoseLk sync.Mutex
+
firehoseSeq int64
+
reposLk sync.Mutex
+
reposSeq string
}
func NewBackend(state *gorm.DB, bf *backfill.Backfiller) *Backend {
···
bf: bf,
}
}
-
-
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
-
b.seqLk.Lock()
-
if evt.Seq > b.lastSeq {
-
b.lastSeq = evt.Seq
-
}
-
b.seqLk.Unlock()
-
-
job, err := b.bf.Store.GetJob(ctx, evt.Repo)
-
if job == nil {
-
if errors.Is(err, backfill.ErrJobNotFound) {
-
return nil
-
} else {
-
return fmt.Errorf("error getting job: %w", err)
-
}
-
} else {
-
return b.bf.HandleEvent(ctx, evt)
-
}
-
}
-
-
func (b *Backend) LoadCursor() (int, error) {
-
var rec cursorRecord
-
if err := b.state.Find(&rec, "id = 1").Error; err != nil {
-
return 0, err
-
}
-
-
if rec.ID == 0 {
-
if err := b.state.Create(&cursorRecord{ID: 1}).Error; err != nil {
-
return 0, err
-
}
-
}
-
-
return rec.Val, nil
-
}
-
-
func (b *Backend) FlushCursor() error {
-
b.seqLk.Lock()
-
v := b.lastSeq
-
b.seqLk.Unlock()
-
-
if err := b.state.Model(cursorRecord{}).Where("id = 1 and val < ?", v).Update("val", v).Error; err != nil {
-
return err
-
}
-
-
return nil
-
}
-
-
func (b *Backend) syncCursorRoutine() {
-
for range time.Tick(time.Second * 5) {
-
if err := b.FlushCursor(); err != nil {
-
slog.Error("failed to flush cursor", "err", err)
-
}
-
}
-
}
-168
cmd/backfiller/backfiller.go
···
-
package main
-
-
import (
-
"context"
-
"fmt"
-
"log"
-
"log/slog"
-
"net/http"
-
"os"
-
"os/signal"
-
"strconv"
-
"syscall"
-
"time"
-
-
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
-
"github.com/bluesky-social/indigo/events"
-
"github.com/bluesky-social/indigo/events/schedulers/parallel"
-
"github.com/bluesky-social/indigo/xrpc"
-
-
"github.com/gorilla/websocket"
-
"github.com/ipfs/go-cid"
-
"gorm.io/driver/sqlite"
-
"gorm.io/gorm"
-
"gorm.io/gorm/logger"
-
)
-
-
type cursorRecord struct {
-
ID uint `gorm:"primaryKey"`
-
Val int
-
}
-
-
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
return nil
-
}
-
-
func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
return nil
-
}
-
-
func handleDelete(ctx context.Context, repo, rev, path string) error {
-
return nil
-
}
-
-
func main() {
-
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
-
defer stop()
-
-
sl := slog.With("source", "backfiller")
-
-
newLogger := logger.New(
-
log.New(os.Stdout, "\n", log.LstdFlags),
-
logger.Config{
-
SlowThreshold: 5 * time.Second,
-
Colorful: false,
-
},
-
)
-
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
-
Logger: newLogger,
-
})
-
if err != nil {
-
sl.Error("failed to connect to database", "err", err)
-
}
-
db.AutoMigrate(&backfill.GormDBJob{})
-
db.AutoMigrate(&cursorRecord{})
-
store := backfill.NewGormstore(db)
-
-
// create and start the backfiller
-
opts := &backfill.BackfillOptions{
-
ParallelBackfills: 10,
-
ParallelRecordCreates: 100,
-
SyncRequestsPerSecond: 4,
-
RelayHost: "https://bsky.network",
-
}
-
bf := backfill.NewBackfiller("backfills", store, handleCreate, handleUpdate, handleDelete, opts)
-
go bf.Start()
-
-
// set up the backend service
-
backend := NewBackend(db, bf)
-
go backend.syncCursorRoutine()
-
-
// connect to the relay
-
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
-
curs, err := backend.LoadCursor()
-
if err != nil {
-
sl.Error("error loading firehose cursor", "err", err)
-
}
-
if curs > 0 {
-
subscribeUrl += "?cursor=" + strconv.Itoa(curs)
-
}
-
-
con, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
-
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
-
})
-
if err != nil {
-
sl.Error("failed to connect to relay", "err", err)
-
}
-
-
// pump repos
-
go func(ctx context.Context, bf *backfill.Backfiller) {
-
if err := pumpRepos(ctx, bf); err != nil {
-
sl.Error("failed pumping repos", "err", err)
-
}
-
}(ctx, bf)
-
-
// read from the firehose
-
rsc := events.RepoStreamCallbacks{
-
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
-
return backend.RepoCommitHandler(ctx, evt)
-
},
-
}
-
sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
-
if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
-
sl.Error("failed to start scheduler", "err", err)
-
}
-
-
<-ctx.Done()
-
bf.Stop(context.TODO())
-
}
-
-
type jobMaker interface {
-
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
-
}
-
-
func pumpRepos(ctx context.Context, bf *backfill.Backfiller) error {
-
sl := slog.With("source", "pumpRepos")
-
-
xrpcc := &xrpc.Client{
-
Host: "https://bsky.network",
-
}
-
-
jmstore, ok := bf.Store.(jobMaker)
-
if !ok {
-
return fmt.Errorf("configured job store doesn't support random job creation")
-
}
-
-
var curs string
-
for {
-
select {
-
case <-ctx.Done():
-
sl.Info("stopping repo pump")
-
return nil
-
default:
-
}
-
-
sl.Info("listing repos", "cursor", curs)
-
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
-
if err != nil {
-
return fmt.Errorf("error listing repos: %w", err)
-
}
-
-
for _, repo := range res.Repos {
-
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
-
if err != nil {
-
sl.Warn("failed to create backfill job", "err", err)
-
continue
-
}
-
}
-
-
if res.Cursor != nil && *res.Cursor != "" {
-
curs = *res.Cursor
-
} else {
-
break
-
}
-
}
-
-
return nil
-
}
+51
cmd/backfiller/cursors.go
···
+
package main
+
+
import (
+
"log/slog"
+
"time"
+
)
+
+
type cursorRecord struct {
+
ID uint `gorm:"primaryKey"`
+
Key string
+
Val any
+
}
+
+
func (b *Backend) LoadCursor(key string) (any, error) {
+
var rec cursorRecord
+
if err := b.state.Find(&rec, "key = ?", key).Error; err != nil {
+
return nil, err
+
}
+
+
if rec.ID == 0 {
+
if err := b.state.Create(&cursorRecord{Key: key}).Error; err != nil {
+
return nil, err
+
}
+
}
+
+
return rec.Val, nil
+
}
+
+
func (b *Backend) FlushCursors() error {
+
b.firehoseLk.Lock()
+
if err := b.state.Model(cursorRecord{}).Where("key = 'firehose'").Update("val", b.firehoseSeq).Error; err != nil {
+
return err
+
}
+
b.firehoseLk.Unlock()
+
+
b.reposLk.Lock()
+
if err := b.state.Model(cursorRecord{}).Where("key = 'repos'").Update("val", b.reposSeq).Error; err != nil {
+
return err
+
}
+
b.reposLk.Unlock()
+
+
return nil
+
}
+
+
func (b *Backend) SyncCursors() {
+
for range time.Tick(time.Second * 5) {
+
if err := b.FlushCursors(); err != nil {
+
slog.Error("failed to flush cursors", "err", err)
+
}
+
}
+
}
+40
cmd/backfiller/handlers.go
···
+
package main
+
+
import (
+
"context"
+
"errors"
+
"fmt"
+
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/ipfs/go-cid"
+
)
+
+
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
+
b.firehoseLk.Lock()
+
b.firehoseSeq = evt.Seq
+
b.firehoseLk.Unlock()
+
+
job, err := b.bf.Store.GetJob(ctx, evt.Repo)
+
if job == nil {
+
if errors.Is(err, backfill.ErrJobNotFound) {
+
return nil
+
} else {
+
return fmt.Errorf("error getting job: %w", err)
+
}
+
} else {
+
return b.bf.HandleEvent(ctx, evt)
+
}
+
}
+
+
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
+
return nil
+
}
+
+
func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
+
return nil
+
}
+
+
func handleDelete(ctx context.Context, repo, rev, path string) error {
+
return nil
+
}
+117
cmd/backfiller/main.go
···
+
package main
+
+
import (
+
"context"
+
"fmt"
+
"log"
+
"log/slog"
+
"net/http"
+
"os"
+
"os/signal"
+
"syscall"
+
"time"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/events/schedulers/parallel"
+
+
"github.com/gorilla/websocket"
+
"gorm.io/driver/sqlite"
+
"gorm.io/gorm"
+
"gorm.io/gorm/logger"
+
)
+
+
func NewDatabase() *gorm.DB {
+
sl := slog.With("source", "database")
+
+
newLogger := logger.New(
+
log.New(os.Stdout, "\n", log.LstdFlags),
+
logger.Config{
+
SlowThreshold: 1 * time.Second,
+
Colorful: false,
+
},
+
)
+
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
+
Logger: newLogger,
+
})
+
if err != nil {
+
sl.Error("failed to connect to database", "err", err)
+
}
+
db.AutoMigrate(&backfill.GormDBJob{})
+
db.AutoMigrate(&cursorRecord{})
+
return db
+
}
+
+
func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
+
opts := &backfill.BackfillOptions{
+
ParallelBackfills: 50,
+
ParallelRecordCreates: 25,
+
SyncRequestsPerSecond: 25,
+
RelayHost: "https://bsky.network",
+
}
+
return backfill.NewBackfiller(
+
"backfills",
+
backfill.NewGormstore(db),
+
handleCreate,
+
handleUpdate,
+
handleDelete,
+
opts,
+
)
+
}
+
+
func NewFirehose(backend *Backend) *websocket.Conn {
+
sl := slog.With("source", "firehose")
+
+
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
+
+
cursor, err := backend.LoadCursor("firehose")
+
if err != nil {
+
sl.Error("failed loading firehose cursor", "err", err)
+
}
+
cc, ok := cursor.(int64)
+
if ok {
+
subscribeUrl += fmt.Sprintf("?cursor=%d", cc)
+
}
+
+
conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
+
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
+
})
+
if err != nil {
+
sl.Error("failed to connect to relay", "err", err)
+
}
+
return conn
+
}
+
+
func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler {
+
rsc := events.RepoStreamCallbacks{
+
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
+
return backend.RepoCommitHandler(ctx, evt)
+
},
+
}
+
return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
+
}
+
+
func main() {
+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
defer stop()
+
+
sl := slog.With("source", "backfiller")
+
+
db := NewDatabase()
+
+
bf := NewBackfiller(db)
+
go bf.Start()
+
+
backend := NewBackend(db, bf)
+
go backend.SyncCursors()
+
go backend.PumpRepos(ctx)
+
+
conn := NewFirehose(backend)
+
sched := NewScheduler(ctx, backend)
+
if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil {
+
sl.Error("failed to start scheduler", "err", err)
+
}
+
<-ctx.Done()
+
bf.Stop(context.TODO())
+
}
+76
cmd/backfiller/pump.go
···
+
package main
+
+
import (
+
"context"
+
"fmt"
+
"log/slog"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/xrpc"
+
)
+
+
type jobMaker interface {
+
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
+
}
+
+
func (b *Backend) PumpRepos(ctx context.Context) error {
+
sl := slog.With("source", "pumpRepos")
+
+
if err := pumpRepos(ctx, b); err != nil {
+
sl.Error("failed pumping repos", "err", err)
+
}
+
+
return nil
+
}
+
+
func pumpRepos(ctx context.Context, backend *Backend) error {
+
sl := slog.With("source", "pumpRepos")
+
bf := backend.bf
+
+
xrpcc := &xrpc.Client{
+
Host: "https://bsky.network",
+
}
+
+
jmstore, ok := bf.Store.(jobMaker)
+
if !ok {
+
return fmt.Errorf("configured job store doesn't support random job creation")
+
}
+
+
cursor, err := backend.LoadCursor("repos")
+
if err != nil {
+
sl.Error("failed to load repos cursor", "err", err)
+
}
+
curs, _ := cursor.(string)
+
+
for {
+
select {
+
case <-ctx.Done():
+
sl.Info("stopping repo pump")
+
return nil
+
default:
+
}
+
+
sl.Info("listing repos", "cursor", curs)
+
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
+
if err != nil {
+
return fmt.Errorf("error listing repos: %w", err)
+
}
+
+
for _, repo := range res.Repos {
+
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
+
if err != nil {
+
sl.Warn("failed to create backfill job", "err", err)
+
continue
+
}
+
}
+
+
if res.Cursor != nil && *res.Cursor != "" {
+
cursor = *res.Cursor
+
} else {
+
break
+
}
+
}
+
+
return nil
+
}