an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

Changed files
+6 -17
cmd
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}
+3 -1
cmd/monarch/firehose.go
···
func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host"))
curs, err := cursorSvc.GetFirehoseCursor()
-
if err == nil { // reversed
+
if err != nil {
+
slog.Error("error getting firehose cursor", "err", err)
+
} else if curs > 0 {
url += fmt.Sprintf("?cursor=%d", curs)
}
+2 -12
cmd/monarch/census.go
···
"time"
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
+
backfill "github.com/bluesky-social/indigo/backfill/next"
"github.com/bluesky-social/indigo/xrpc"
"github.com/urfave/cli/v2"
"golang.org/x/sync/semaphore"
···
seenHosts map[string]bool
}
-
type jobMaker interface {
-
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
-
}
-
func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
return &CensusService{
cursor: cursorSvc,
···
Host: "https://" + host,
}
-
jmstore, ok := cs.backfill.Store.(jobMaker)
-
if !ok {
-
slog.Error("configured job store doesn't support random job creation")
-
return
-
}
-
hcur, err := cs.cursor.GetHostCursor(host)
if err != nil {
slog.Error("error fetching host cursor", "err", err)
···
}
for _, repo := range res.Repos {
-
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
+
err := cs.backfill.EnqueueJob(ctx, host, repo.Did)
if err != nil {
slog.Error("error adding repo to backfiller", "err", err)
} else {