an app.bsky.* indexer

Add gorm logger, pump repos

Changed files
+92 -24
cmd
backfiller
+89 -21
cmd/backfiller/backfiller.go
···
import (
"context"
+
"errors"
+
"fmt"
+
"log"
"net/http"
-
// "strings"
-
// "log/slog"
-
"log"
"os"
"os/signal"
"syscall"
+
"time"
+
"github.com/bluesky-social/indigo/api/atproto"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
+
"github.com/bluesky-social/indigo/xrpc"
"github.com/gorilla/websocket"
"github.com/ipfs/go-cid"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
+
"gorm.io/gorm/logger"
)
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
log.Printf("handleCreate: at://%s/%s", repo, path)
return nil
}
···
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
-
db, err := gorm.Open(sqlite.Open("backfiller.db"), &gorm.Config{})
+
newLogger := logger.New(
+
log.New(os.Stdout, "\n", log.LstdFlags),
+
logger.Config{
+
SlowThreshold: 5*time.Second,
+
Colorful: false,
+
},
+
)
+
+
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
+
Logger: newLogger,
+
})
if err != nil {
log.Fatalf("failed to connect database: %s", err)
}
···
store := backfill.NewGormstore(db)
// connect to the relay
-
d := websocket.DefaultDialer
-
con, _, err := d.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{})
+
con, _, err := websocket.DefaultDialer.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{
+
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
+
})
if err != nil {
log.Fatalf("failed to connect to relay: %s", err)
}
-
rsc := events.RepoStreamCallbacks{
-
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
-
// log.Printf("enqueuing: %s", evt.Repo)
-
return store.EnqueueJobWithState(ctx, evt.Repo, backfill.StateEnqueued)
-
},
-
}
-
// start backfilling
opts := &backfill.BackfillOptions{
-
ParallelBackfills: 10,
+
ParallelBackfills: 10,
ParallelRecordCreates: 100,
-
NSIDFilter: "app.bsky.labeler.service",
-
SyncRequestsPerSecond: 10,
-
RelayHost: "https://bsky.network",
+
SyncRequestsPerSecond: 4,
+
RelayHost: "https://bsky.network",
}
-
bf := backfill.NewBackfiller("backfiller-labels", store, handleCreate, handleUpdate, handleDelete, opts)
+
bf := backfill.NewBackfiller("backfills", store, handleCreate, handleUpdate, handleDelete, opts)
go bf.Start()
-
sched := parallel.NewScheduler(4, 1000, "backfiller-labels", rsc.EventHandler)
+
// pump repos
+
go func(bf *backfill.Backfiller) {
+
if err := pumpRepos(context.TODO(), bf); err != nil {
+
log.Printf("failed pumping repos: %s", err)
+
}
+
}(bf)
+
+
// read from the firehose
+
rsc := events.RepoStreamCallbacks{
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
+
job, err := bf.Store.GetJob(ctx, evt.Repo)
+
if job == nil {
+
if errors.Is(err, backfill.ErrJobNotFound) {
+
return nil
+
} else {
+
return fmt.Errorf("error getting job: %w", err)
+
}
+
} else {
+
return bf.HandleEvent(ctx, evt)
+
}
+
},
+
}
+
sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
log.Fatalf("failed to start scheduler: %s", err)
}
<-ctx.Done()
-
bf.Stop(ctx)
+
bf.Stop(context.TODO())
+
}
+
+
type jobMaker interface {
+
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
+
}
+
+
func pumpRepos(ctx context.Context, bf *backfill.Backfiller) error {
+
xrpcc := &xrpc.Client{
+
Host: "https://bsky.network",
+
}
+
+
jmstore, ok := bf.Store.(jobMaker)
+
if !ok {
+
return fmt.Errorf("configured job store doesn't support random job creation")
+
}
+
+
var curs string
+
for {
+
log.Printf("listing repos with cursor = %v", curs)
+
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
+
if err != nil {
+
return fmt.Errorf("error listing repos: %w", err)
+
}
+
+
for _, repo := range res.Repos {
+
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
+
if err != nil {
+
log.Printf("failed to create backfill job: %s", err)
+
continue
+
}
+
}
+
+
if res.Cursor != nil && *res.Cursor != "" {
+
curs = *res.Cursor
+
} else {
+
break
+
}
+
}
+
+
return nil
}
+1 -1
go.mod
···
toolchain go1.24.5
require (
-
github.com/bluesky-social/indigo v0.0.0-20250716175339-eebba3d96129
+
github.com/bluesky-social/indigo v0.0.0-20250724221105-5827c8fb61bb
github.com/gorilla/websocket v1.5.1
github.com/ipfs/go-cid v0.4.1
gorm.io/driver/sqlite v1.5.5
+2 -2
go.sum
···
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
-
github.com/bluesky-social/indigo v0.0.0-20250716175339-eebba3d96129 h1:81032NtsXNDdD8St+XHEdFlbRukJDKlS7XkGhrdZKNk=
-
github.com/bluesky-social/indigo v0.0.0-20250716175339-eebba3d96129/go.mod h1:0XUyOCRtL4/OiyeqMTmr6RlVHQMDgw3LS7CfibuZR5Q=
+
github.com/bluesky-social/indigo v0.0.0-20250724221105-5827c8fb61bb h1:BqMNDZMfXwiRTJ6NvQotJ0qInn37JH5U8E+TF01CFHQ=
+
github.com/bluesky-social/indigo v0.0.0-20250724221105-5827c8fb61bb/go.mod h1:0XUyOCRtL4/OiyeqMTmr6RlVHQMDgw3LS7CfibuZR5Q=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/carlmjohnson/versioninfo v0.22.5 h1:O00sjOLUAFxYQjlN/bzYTuZiS0y6fWDQjMRvwtKgwwc=