an app.bsky.* indexer

enqueue jobs from the firehose

Changed files
+2 -21
cmd
-2
cmd/backfiller/backend.go
···
data *gorm.DB
bf *backfill.Backfiller
-
backfillComplete bool
-
firehoseLk sync.Mutex
firehoseSeq string
reposLk sync.Mutex
+2 -17
cmd/backfiller/handlers.go
···
import (
"bytes"
"context"
-
"errors"
"fmt"
"log/slog"
"strconv"
···
comatproto "github.com/bluesky-social/indigo/api/atproto"
appbsky "github.com/bluesky-social/indigo/api/bsky"
-
"github.com/bluesky-social/indigo/backfill"
"github.com/ipfs/go-cid"
)
···
b.firehoseSeq = strconv.Itoa(int(evt.Seq))
b.firehoseLk.Unlock()
-
if b.backfillComplete {
-
return b.bf.HandleEvent(ctx, evt)
-
}
-
-
job, err := b.bf.Store.GetJob(ctx, evt.Repo)
-
if job == nil {
-
if errors.Is(err, backfill.ErrJobNotFound) {
-
return nil
-
} else {
-
return fmt.Errorf("error getting job: %w", err)
-
}
-
} else {
-
return b.bf.HandleEvent(ctx, evt)
-
}
+
return b.bf.HandleEvent(ctx, evt)
}
type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error
···
return nil
}
-
sl := slog.With("source", "commitHandler")
+
sl := slog.With("source", "HandleCreateOp")
var out appbsky.FeedGenerator
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
-2
cmd/backfiller/pump.go
···
}
}
-
sl.Info("finished listing repos, switching over to event stream")
-
b.backfillComplete = true
return nil
}