an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

Changed files
+12 -18
cmd
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}
+6 -9
cmd/monarch/backfill.go
···
package main
import (
-
"github.com/bluesky-social/indigo/backfill"
+
backfill "github.com/bluesky-social/indigo/backfill/next"
"github.com/urfave/cli/v2"
)
func NewBackfillService(store backfill.Store, h *HandlerService, cctx *cli.Context) *backfill.Backfiller {
-
opts := &backfill.BackfillOptions{
-
ParallelBackfills: cctx.Int("backfill-workers"),
-
ParallelRecordCreates: cctx.Int("backfill-consumers"),
-
NSIDFilter: "",
-
SyncRequestsPerSecond: cctx.Int("sync-requests-limit"),
-
RelayHost: "https://" + cctx.String("relay-host"),
-
}
+
opts := backfill.DefaultBackfillerOptions()
+
opts.PerPDSBackfillConcurrency = cctx.Int("backfill-workers")
+
opts.GlobalRecordCreateConcurrency = cctx.Int("backfill-consumers")
+
opts.PerPDSSyncsPerSecond = cctx.Float64("sync-requests-limit")
-
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
+
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, opts)
}
+5 -5
cmd/monarch/main.go
···
"time"
comatproto "github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
+
backfill "github.com/bluesky-social/indigo/backfill/next"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
"github.com/bluesky-social/indigo/util/cliutil"
···
app.handler = NewHandlerService(app.content)
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, cctx)
-
go app.backfill.Start()
app.census = NewCensusService(app.cursor, app.backfill)
go app.census.Start(ctx, cctx)
···
rsc := events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
app.cursor.SetFirehoseCursor(evt.Seq)
-
return app.backfill.HandleEvent(ctx, evt)
+
// return app.backfill.HandleEvent(ctx, evt)
+
return nil
},
// TODO account
// TODO identity
···
return nil
}
-
if err := app.backfill.Stop(ctx); err != nil {
+
if err := app.backfill.Shutdown(ctx); err != nil {
slog.Error("error stopping backfiller", "err", err)
}
···
Name: "max-repo-crawlers",
Value: 4,
},
-
&cli.IntFlag{
+
&cli.Float64Flag{
Name: "sync-requests-limit",
Value: 10, // ratelimit-policy: 3000;w=300
},