an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

Changed files
+8 -25
cmd
+2
go.sum
···
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
+
github.com/bluesky-social/indigo v0.0.0-20250808182429-6f0837c2d12b h1:bJTlFwMhB9sluuqZxVXtv2yFcaWOC/PZokz9mcwb4u4=
+
github.com/bluesky-social/indigo v0.0.0-20250808182429-6f0837c2d12b/go.mod h1:0XUyOCRtL4/OiyeqMTmr6RlVHQMDgw3LS7CfibuZR5Q=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
+1 -1
go.mod
···
github.com/gorilla/websocket v1.5.1
github.com/ipfs/go-cid v0.4.1
github.com/urfave/cli/v2 v2.25.7
+
golang.org/x/sync v0.7.0
gorm.io/gorm v1.25.9
)
···
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
-
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
+4 -20
cmd/monarch/census.go
···
)
type CensusService struct {
-
cursor *CursorService
-
backfill *backfill.Backfiller
-
+
cursor *CursorService
+
backfill *backfill.Backfiller
seenHosts map[string]bool
-
seenLk sync.Mutex
-
-
storeLk sync.Mutex
}
type jobMaker interface {
···
for _, host := range res.Hosts {
// don't reprocess hosts already handled
-
cs.seenLk.Lock()
-
_, ok := cs.seenHosts[host.Hostname]
-
cs.seenLk.Unlock()
-
if ok {
+
seen := cs.seenHosts[host.Hostname]
+
if seen {
slog.Info("already processed host, skipping", "host", host)
continue
}
···
return
}
-
cs.storeLk.Lock()
hcur, err := cs.cursor.GetHostCursor(host)
if err != nil {
slog.Error("error fetching host cursor", "err", err)
}
-
cs.storeLk.Unlock()
var added int
curs := hcur.Cursor
···
continue
}
-
cs.storeLk.Lock()
for _, repo := range res.Repos {
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
if err != nil {
···
added += 1
}
}
-
cs.storeLk.Unlock()
if res.Cursor != nil && *res.Cursor != "" {
curs = *res.Cursor
-
cs.storeLk.Lock()
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
slog.Error("error updating cursor for host", "err", err)
}
-
cs.storeLk.Unlock()
} else {
break
}
}
slog.Info("finished listing repos", "host", host)
-
-
cs.seenLk.Lock()
-
defer cs.seenLk.Unlock()
-
cs.seenHosts[host] = true
}
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}