an app.bsky.* indexer

rework inits

Changed files
+28 -17
cmd
+1 -2
cmd/backfiller/backend.go
···
reposSeq string
}
-
func NewBackend(state *gorm.DB, bf *backfill.Backfiller) *Backend {
return &Backend{
state: state,
-
bf: bf,
}
}
···
reposSeq string
}
+
func NewBackend(state *gorm.DB) *Backend {
return &Backend{
state: state,
}
}
+11 -4
cmd/backfiller/handlers.go
···
"github.com/ipfs/go-cid"
)
-
func (b *Backend) RepoCommitHandler(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
select {
case <-ctx.Done():
return nil
···
}
}
-
func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
-
func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
-
func handleDelete(ctx context.Context, repo, rev, path string) error {
return nil
}
···
"github.com/ipfs/go-cid"
)
+
type commitHandler func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error
+
+
func (b *Backend) RepoCommitHandler(
+
ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit,
+
) error {
select {
case <-ctx.Done():
return nil
···
}
}
+
type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error
+
type handleOpDelete func(context.Context, string, string, string) error
+
+
func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
+
func (b *Backend) HandleUpdateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
+
func (b *Backend) HandleDeleteOp(ctx context.Context, repo, rev, path string) error {
return nil
}
+16 -11
cmd/backfiller/main.go
···
"syscall"
"time"
-
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
···
return db
}
-
func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
opts := &backfill.BackfillOptions{
// ParallelBackfills: 50,
// ParallelRecordCreates: 25,
···
return backfill.NewBackfiller(
"backfills",
backfill.NewGormstore(db),
-
handleCreate,
-
handleUpdate,
-
handleDelete,
opts,
)
}
···
return conn
}
-
func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler {
rsc := events.RepoStreamCallbacks{
-
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
-
return backend.RepoCommitHandler(ctx, evt)
},
}
···
streamCtx, streamCancel := context.WithCancel(context.Background())
db := NewDatabase()
-
bf := NewBackfiller(db)
go bf.Start()
-
backend := NewBackend(db, bf)
go backend.SyncCursors(streamCtx)
cursor, err := backend.LoadCursor("firehose")
···
}
conn := NewFirehose(streamCtx, cursor)
-
sched := NewScheduler(streamCtx, backend)
go func() {
if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
sl.Error("failed to start scheduler", "err", err)
···
"syscall"
"time"
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
···
return db
}
+
func NewBackfiller(
+
db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
+
) *backfill.Backfiller {
opts := &backfill.BackfillOptions{
// ParallelBackfills: 50,
// ParallelRecordCreates: 25,
···
return backfill.NewBackfiller(
"backfills",
backfill.NewGormstore(db),
+
create, update, delete,
opts,
)
}
···
return conn
}
+
func NewScheduler(
+
ctx context.Context, commitCallback commitHandler,
+
) *parallel.Scheduler {
rsc := events.RepoStreamCallbacks{
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
+
return commitCallback(ctx, evt)
},
}
···
streamCtx, streamCancel := context.WithCancel(context.Background())
db := NewDatabase()
+
backend := NewBackend(db)
+
bf := NewBackfiller(db, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
go bf.Start()
+
// attach the backfiller to the backend so pump and repo commit handler can use it
+
backend.bf = bf
+
go backend.SyncCursors(streamCtx)
cursor, err := backend.LoadCursor("firehose")
···
}
conn := NewFirehose(streamCtx, cursor)
+
sched := NewScheduler(streamCtx, backend.RepoCommitHandler)
go func() {
if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
sl.Error("failed to start scheduler", "err", err)