an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

Changed files
+27 -26
cmd
+20
cmd/monarch/handlers.go
···
"context"
"encoding/json"
"fmt"
+
"log/slog"
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
···
var out appbsky.LabelerService
out.UnmarshalCBOR(bytes.NewReader(*rec))
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.list"):
+
var out appbsky.GraphList
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.verification"):
+
var out appbsky.GraphVerification
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
case syntax.NSID("app.bsky.graph.starterpack"):
+
var out appbsky.GraphStarterpack
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
+
body, err = json.Marshal(out)
+
+
default:
+
slog.Error("tracked collection missing handler", "collection", uri.Collection())
+
return nil
}
switch action {
+1 -1
cmd/monarch/backfill.go
···
ParallelBackfills: cctx.Int("backfill-workers"),
ParallelRecordCreates: cctx.Int("backfill-consumers"),
NSIDFilter: "",
-
SyncRequestsPerSecond: 10,
+
SyncRequestsPerSecond: cctx.Int("sync-requests-limit"),
RelayHost: "https://" + cctx.String("relay-host"),
}
+1 -1
cmd/monarch/main.go
···
},
&cli.IntFlag{
Name: "sync-requests-limit",
-
Value: 30,
+
Value: 10, // ratelimit-policy: 3000;w=300
},
}
+4 -20
cmd/monarch/census.go
···
)
type CensusService struct {
-
cursor *CursorService
-
backfill *backfill.Backfiller
-
+
cursor *CursorService
+
backfill *backfill.Backfiller
seenHosts map[string]bool
-
seenLk sync.Mutex
-
-
storeLk sync.Mutex
}
type jobMaker interface {
···
for _, host := range res.Hosts {
// don't reprocess hosts already handled
-
cs.seenLk.Lock()
-
_, ok := cs.seenHosts[host.Hostname]
-
cs.seenLk.Unlock()
-
if ok {
+
seen := cs.seenHosts[host.Hostname]
+
if seen {
slog.Info("already processed host, skipping", "host", host)
continue
}
···
return
}
-
cs.storeLk.Lock()
hcur, err := cs.cursor.GetHostCursor(host)
if err != nil {
slog.Error("error fetching host cursor", "err", err)
}
-
cs.storeLk.Unlock()
var added int
curs := hcur.Cursor
···
continue
}
-
cs.storeLk.Lock()
for _, repo := range res.Repos {
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
if err != nil {
···
added += 1
}
}
-
cs.storeLk.Unlock()
if res.Cursor != nil && *res.Cursor != "" {
curs = *res.Cursor
-
cs.storeLk.Lock()
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
slog.Error("error updating cursor for host", "err", err)
}
-
cs.storeLk.Unlock()
} else {
break
}
}
slog.Info("finished listing repos", "host", host)
-
-
cs.seenLk.Lock()
-
defer cs.seenLk.Unlock()
-
cs.seenHosts[host] = true
}
+1 -4
cmd/monarch/cursors.go
···
import (
"context"
"log/slog"
-
"sync"
"time"
"gorm.io/gorm"
)
type CursorService struct {
-
store *gorm.DB
-
-
firehoseLk sync.Mutex
+
store *gorm.DB
firehoseSeq int64
}