an app.bsky.* indexer

add relay-host flag

Changed files
+22 -14
cmd
+8 -5
cmd/monarch/backfill.go
···
package main
-
import "github.com/bluesky-social/indigo/backfill"
+
import (
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/urfave/cli/v2"
+
)
-
func NewBackfillService(store backfill.Store, h *HandlerService, workers int, consumers int) *backfill.Backfiller {
+
func NewBackfillService(store backfill.Store, h *HandlerService, cctx *cli.Context) *backfill.Backfiller {
opts := &backfill.BackfillOptions{
-
ParallelBackfills: workers,
-
ParallelRecordCreates: consumers,
+
ParallelBackfills: cctx.Int("backfill-workers"),
+
ParallelRecordCreates: cctx.Int("backfill-consumers"),
NSIDFilter: "",
SyncRequestsPerSecond: 10,
-
RelayHost: "https://bsky.network",
+
RelayHost: "https://" + cctx.String("relay-host"),
}
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
+3 -2
cmd/monarch/census.go
···
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/xrpc"
+
"github.com/urfave/cli/v2"
)
type CensusService struct {
···
}
}
-
func (cs *CensusService) Start(ctx context.Context) {
+
func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) {
xrpcc := &xrpc.Client{
-
Host: "https://bsky.network",
+
Host: "https://" + cctx.String("relay-host"),
}
jmstore, ok := cs.backfill.Store.(jobMaker)
+4 -2
cmd/monarch/firehose.go
···
import (
"context"
+
"fmt"
"net/http"
"github.com/gorilla/websocket"
+
"github.com/urfave/cli/v2"
)
-
func NewFirehoseConnection(ctx context.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
-
url := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
+
func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
+
url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host"))
curs, _ := cursorSvc.Get("firehose")
if curs != "" {
url += "?cursor=" + curs
+7 -5
cmd/monarch/main.go
···
app.handler = NewHandlerService(app.content)
-
workers := cctx.Int("backfill-workers")
-
consumers := cctx.Int("backfill-consumers")
-
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, workers, consumers)
+
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, cctx)
go app.backfill.Start()
app.census = NewCensusService(app.cursor, app.backfill)
-
go app.census.Start(ctx)
+
go app.census.Start(ctx, cctx)
-
wsconn, err := NewFirehoseConnection(ctx, app.cursor)
+
wsconn, err := NewFirehoseConnection(ctx, cctx, app.cursor)
if err != nil {
return fmt.Errorf("error connecting to relay: %w", err)
}
···
&cli.StringFlag{
Name: "content-db-url",
Value: "sqlite://./content.db",
+
},
+
&cli.StringFlag{
+
Name: "relay-host",
+
Value: "bsky.network",
},
&cli.IntFlag{
Name: "max-db-connections",