an app.bsky.* indexer

Enqueue from firehose

Changed files
+49 -21
cmd
backfiller
+40 -21
cmd/backfiller/backfiller.go
···
import (
"context"
-
"log/slog"
+
"net/http"
+
// "strings"
+
// "log/slog"
+
"log"
"os"
"os/signal"
-
"strings"
"syscall"
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/events"
+
"github.com/bluesky-social/indigo/events/schedulers/parallel"
+
+
"github.com/gorilla/websocket"
"github.com/ipfs/go-cid"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
if !strings.HasPrefix(path, "com.whtwnd.blog.entry") {
-
return nil
-
}
-
slog.Info("create", "path", path)
+
log.Printf("handleCreate: at://%s/%s", repo, path)
return nil
}
···
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
-
// db, err := gorm.Open(sqlite.Open("backfiller.db"), &gorm.Config{})
-
db, err := gorm.Open(sqlite.Open("file::memory:?cache=shared"), &gorm.Config{})
+
db, err := gorm.Open(sqlite.Open("backfiller.db"), &gorm.Config{})
if err != nil {
-
panic("failed to connect database")
+
log.Fatalf("failed to connect database: %s", err)
}
db.AutoMigrate(&backfill.GormDBJob{})
+
store := backfill.NewGormstore(db)
-
store := backfill.NewGormstore(db)
-
store.EnqueueJob(ctx, "did:plc:4nsduwlpivpuur4mqkbfvm6a")
+
// connect to the relay
+
d := websocket.DefaultDialer
+
con, _, err := d.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{})
+
if err != nil {
+
log.Fatalf("failed to connect to relay: %s", err)
+
}
-
opts := backfill.DefaultBackfillOptions()
-
opts.NSIDFilter = "com.whtwnd.blog.entry"
+
rsc := events.RepoStreamCallbacks{
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
+
// log.Printf("enqueuing: %s", evt.Repo)
+
return store.EnqueueJobWithState(ctx, evt.Repo, backfill.StateEnqueued)
+
},
+
}
-
bf := backfill.NewBackfiller(
-
"backfill-feeds",
-
store,
-
handleCreate,
-
handleUpdate,
-
handleDelete,
-
opts,
-
)
+
// start backfilling
+
opts := &backfill.BackfillOptions{
+
ParallelBackfills: 10,
+
ParallelRecordCreates: 100,
+
NSIDFilter: "app.bsky.labeler.service",
+
SyncRequestsPerSecond: 10,
+
RelayHost: "https://bsky.network",
+
}
+
bf := backfill.NewBackfiller("backfiller-labels", store, handleCreate, handleUpdate, handleDelete, opts)
go bf.Start()
+
+
sched := parallel.NewScheduler(4, 1000, "backfiller-labels", rsc.EventHandler)
+
if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
+
log.Fatalf("failed to start scheduler: %s", err)
+
}
+
<-ctx.Done()
bf.Stop(ctx)
}
+3
go.mod
···
require (
github.com/bluesky-social/indigo v0.0.0-20250716175339-eebba3d96129
+
github.com/gorilla/websocket v1.5.1
github.com/ipfs/go-cid v0.4.1
gorm.io/driver/sqlite v1.5.5
gorm.io/gorm v1.25.9
)
require (
+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
···
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
+
golang.org/x/net v0.23.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
+6
go.sum
···
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU=
+
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
···
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
+
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
···
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
+
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=