an app.bsky.* indexer

Merge branch 'DI'

-26
cmd/backfiller/backend.go
···
-
package main
-
-
import (
-
"sync"
-
-
"github.com/bluesky-social/indigo/backfill"
-
"gorm.io/gorm"
-
)
-
-
type Backend struct {
-
state *gorm.DB
-
data *gorm.DB
-
bf *backfill.Backfiller
-
-
firehoseLk sync.Mutex
-
firehoseSeq string
-
reposLk sync.Mutex
-
reposSeq string
-
}
-
-
func NewBackend(state, data *gorm.DB) *Backend {
-
return &Backend{
-
state: state,
-
data: data,
-
}
-
}
+15
cmd/backfiller/backfill.go
···
+
package main
+
+
import "github.com/bluesky-social/indigo/backfill"
+
+
func NewBackfillService(store backfill.Store, h *HandlerService) *backfill.Backfiller {
+
opts := &backfill.BackfillOptions{
+
ParallelBackfills: 10,
+
ParallelRecordCreates: 1,
+
NSIDFilter: "",
+
SyncRequestsPerSecond: 5,
+
RelayHost: "https://bsky.network",
+
}
+
+
return backfill.NewBackfiller("backfiller", store, h.HandleCreate, h.HandleUpdate, h.HandleDelete, opts)
+
}
+71
cmd/backfiller/census.go
···
+
package main
+
+
import (
+
"context"
+
"log/slog"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/backfill"
+
"github.com/bluesky-social/indigo/xrpc"
+
)
+
+
type CensusService struct {
+
cursor *CursorService
+
backfill *backfill.Backfiller
+
}
+
+
type jobMaker interface {
+
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
+
}
+
+
func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
+
return &CensusService{
+
cursor: cursorSvc,
+
backfill: backfillSvc,
+
}
+
}
+
+
func (cs *CensusService) Start(ctx context.Context) {
+
xrpcc := &xrpc.Client{
+
Host: "https://bsky.network",
+
}
+
+
jmstore, ok := cs.backfill.Store.(jobMaker)
+
if !ok {
+
slog.Error("configured job store doesn't support random job creation")
+
return
+
}
+
+
curs, _ := cs.cursor.Get("repos")
+
for {
+
select {
+
case <-ctx.Done():
+
slog.Info("stopping repo census")
+
return
+
default:
+
}
+
+
slog.Info("listing repos", "cursor", curs)
+
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
+
if err != nil {
+
slog.Error("error listing repos", "err", err)
+
return
+
}
+
+
for _, repo := range res.Repos {
+
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
+
if err != nil {
+
slog.Error("error adding listed repo to backfiller", "err", err)
+
}
+
}
+
+
if res.Cursor != nil && *res.Cursor != "" {
+
cs.cursor.reposLk.Lock()
+
curs = *res.Cursor
+
cs.cursor.reposSeq = curs
+
cs.cursor.reposLk.Unlock()
+
} else {
+
break
+
}
+
}
+
}
+74 -31
cmd/backfiller/cursors.go
···
import (
"context"
-
"errors"
"fmt"
"log/slog"
+
"strconv"
+
"sync"
"time"
"gorm.io/gorm"
)
type cursorRecord struct {
-
ID uint `gorm:"primaryKey"`
-
Key string `gorm:"unique"`
+
ID uint `gorm:"primaryKey"`
+
Key string
Val string
}
-
func (b *Backend) LoadCursor(key string) (string, error) {
+
type CursorService struct {
+
store *gorm.DB
+
+
firehoseLk sync.Mutex
+
firehoseSeq string
+
+
reposLk sync.Mutex
+
reposSeq string
+
}
+
+
func NewCursorService(store *gorm.DB) *CursorService {
+
store.AutoMigrate(&cursorRecord{})
+
+
var rec cursorRecord
+
store.First(&rec, 1)
+
if rec.ID == 0 {
+
store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""})
+
}
+
+
store.First(&rec, 2)
+
if rec.ID == 0 {
+
store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""})
+
}
+
+
return &CursorService{
+
store: store,
+
}
+
}
+
+
func (cs *CursorService) Get(key string) (string, error) {
var rec cursorRecord
-
if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil {
-
if errors.Is(err, gorm.ErrRecordNotFound) {
-
b.state.Create(&cursorRecord{Key: key, Val: ""})
-
}
-
return "", err
+
if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil {
+
return "", fmt.Errorf("error fetching cursor record: %w", err)
}
return rec.Val, nil
}
-
func (b *Backend) FlushCursors() error {
-
sl := slog.With("source", "flushCursors")
+
func (cs *CursorService) SetFirehoseCursor(seq int64) {
+
cs.firehoseLk.Lock()
+
val := strconv.Itoa(int(seq))
+
cs.firehoseSeq = val
+
cs.firehoseLk.Unlock()
+
}
+
+
func (cs *CursorService) SetReposCursor(value string) {
+
cs.reposLk.Lock()
+
cs.reposSeq = value
+
cs.reposLk.Unlock()
+
}
-
b.firehoseLk.Lock()
-
sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq)
-
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil {
-
return fmt.Errorf("failed to persist firehose cursor: %w", err)
+
func (cs *CursorService) Flush() error {
+
flusher := func(lk *sync.Mutex, key, value string) error {
+
lk.Lock()
+
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", key).Update("val", value).Error; err != nil {
+
return fmt.Errorf("error updating cursor record: %+v: %w", cursorRecord{Key: key, Val: value}, err)
+
}
+
lk.Unlock()
+
return nil
}
-
b.firehoseLk.Unlock()
-
b.reposLk.Lock()
-
sl.Info("persisting repos cursor", "cursor", b.reposSeq)
-
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil {
-
return fmt.Errorf("failed to persist repos cursor: %w", err)
+
if err := flusher(&cs.firehoseLk, "firehose", cs.firehoseSeq); err != nil {
+
return err
}
-
b.reposLk.Unlock()
+
+
if err := flusher(&cs.reposLk, "repos", cs.reposSeq); err != nil {
+
return err
+
}
return nil
}
-
func (b *Backend) SyncCursors(ctx context.Context) error {
-
for range time.Tick(time.Second * 5) {
+
func (cs *CursorService) CheckpointCursors(ctx context.Context) {
+
t := time.NewTicker(time.Second * 5)
+
defer t.Stop()
+
+
for {
select {
case <-ctx.Done():
-
return nil
-
default:
-
//
+
slog.Info("stopping cursor checkpointer")
+
return
+
case <-t.C:
}
-
if err := b.FlushCursors(); err != nil {
-
slog.Error("failed to flush cursors", "err", err)
-
return fmt.Errorf("failed to flush cursors: %w", err)
+
slog.Info("flushing cursors")
+
if err := cs.Flush(); err != nil {
+
slog.Error("error flushing cursors", "err", err)
+
return
}
}
-
-
return nil
}
-27
cmd/backfiller/database.go
···
package main
import (
-
"fmt"
"log"
"log/slog"
"os"
···
db.Exec("PRAGMA journal_mode=WAL")
return db
}
-
-
func (b *Backend) CleanlyClose() error {
-
closeDb := func(db *gorm.DB) error {
-
if err := db.Exec("PRAGMA wal_checkpoint(TRUNCATE)").Error; err != nil {
-
return fmt.Errorf("failed checkpointing the WAL: %w", err)
-
}
-
rawDb, err := db.DB()
-
if err != nil {
-
return fmt.Errorf("failed getting underlying DB connection: %w", err)
-
}
-
if err := rawDb.Close(); err != nil {
-
return fmt.Errorf("failed closing underlying DB connection: %w", err)
-
}
-
return nil
-
}
-
-
if err := closeDb(b.state); err != nil {
-
return fmt.Errorf("failed to close state database: %w", err)
-
}
-
-
if err := closeDb(b.data); err != nil {
-
return fmt.Errorf("failed to close content database: %w", err)
-
}
-
-
return nil
-
}
+25
cmd/backfiller/firehose.go
···
+
package main
+
+
import (
+
"context"
+
"net/http"
+
+
"github.com/gorilla/websocket"
+
)
+
+
func NewFirehoseConnection(ctx context.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
+
url := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
+
curs, _ := cursorSvc.Get("firehose")
+
if curs != "" {
+
url += "?cursor=" + curs
+
}
+
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{
+
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
return conn, nil
+
}
+19 -38
cmd/backfiller/handlers.go
···
"bytes"
"context"
"fmt"
-
"log/slog"
-
"strconv"
"strings"
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
appbsky "github.com/bluesky-social/indigo/api/bsky"
"github.com/ipfs/go-cid"
+
"gorm.io/gorm"
)
-
type commitHandler func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error
+
type HandlerService struct {
+
store *gorm.DB
+
}
+
+
func NewHandlerService(store *gorm.DB) *HandlerService {
+
store.AutoMigrate(&Profile{})
+
store.AutoMigrate(&Feedgen{})
+
// TODO the rest
-
func (b *Backend) RepoCommitHandler(
-
ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit,
-
) error {
-
select {
-
case <-ctx.Done():
-
return nil
-
default:
-
//
+
return &HandlerService{
+
store: store,
}
-
-
b.firehoseLk.Lock()
-
b.firehoseSeq = strconv.Itoa(int(evt.Seq))
-
b.firehoseLk.Unlock()
-
-
return b.bf.HandleEvent(ctx, evt)
}
-
type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error
-
type handleOpDelete func(context.Context, string, string, string) error
-
-
func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
+
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
return nil
}
-
-
sl := slog.With("source", "HandleCreateOp")
var out appbsky.FeedGenerator
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
-
sl.Error("failed to unmarshal record", "err", err)
-
return fmt.Errorf("failed to unmarshal record: %w", err)
+
return fmt.Errorf("error unmarshalling record: %w", err)
}
-
feedgen := &FeedGenerator{
-
AtUri: fmt.Sprintf("at://%s/%s", repo, path),
-
DisplayName: out.DisplayName,
-
FeedService: out.Did,
-
CreatedAt: out.CreatedAt,
-
Description: out.Description,
-
ContentMode: out.ContentMode,
-
AcceptsInteractions: out.AcceptsInteractions,
+
feedgen := Feedgen{
+
DisplayName: out.DisplayName,
}
-
if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil {
-
return fmt.Errorf("error adding feedgen to database: %w", err)
+
if err := hs.store.Create(&feedgen).Error; err != nil {
+
return fmt.Errorf("error saving feedgen: %w", err)
}
return nil
}
-
func (b *Backend) HandleUpdateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
+
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
return nil
}
-
func (b *Backend) HandleDeleteOp(ctx context.Context, repo, rev, path string) error {
+
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
return nil
}
+75 -107
cmd/backfiller/main.go
···
import (
"context"
+
"fmt"
"log/slog"
-
"net/http"
-
"os"
"os/signal"
"syscall"
"time"
···
"github.com/bluesky-social/indigo/backfill"
"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/parallel"
-
"github.com/gorilla/websocket"
"gorm.io/gorm"
)
-
func NewBackfiller(
-
db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
-
) *backfill.Backfiller {
-
opts := &backfill.BackfillOptions{
-
// ParallelBackfills: 50,
-
// ParallelRecordCreates: 25,
-
// SyncRequestsPerSecond: 25,
+
type App struct {
+
backfill *backfill.Backfiller
+
cursor *CursorService
+
handler *HandlerService
+
census *CensusService
+
wsconn *websocket.Conn
+
state *gorm.DB
+
content *gorm.DB
+
}
-
ParallelBackfills: 10,
-
ParallelRecordCreates: 1, // sqlite
-
SyncRequestsPerSecond: 5,
+
func NewApp() *App {
+
stateDatabase := NewDatabase("state.db")
+
stateDatabase.AutoMigrate(&backfill.GormDBJob{})
-
// NSIDFilter: "app.bsky.feed.generator",
-
RelayHost: "https://bsky.network",
+
contentDatabase := NewDatabase("content.db")
+
+
return &App{
+
state: stateDatabase,
+
content: contentDatabase,
}
+
}
-
return backfill.NewBackfiller(
-
"backfills",
-
backfill.NewGormstore(db),
-
create, update, delete,
-
opts,
-
)
-
}
+
func (app *App) Start(ctx context.Context) error {
+
app.cursor = NewCursorService(app.state)
+
go app.cursor.CheckpointCursors(ctx)
+
+
app.handler = NewHandlerService(app.content)
+
+
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler)
+
go app.backfill.Start()
-
func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
-
sl := slog.With("source", "firehose")
-
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
-
if cursor != "" {
-
subscribeUrl += "?cursor=" + cursor
-
}
+
app.census = NewCensusService(app.cursor, app.backfill)
+
go app.census.Start(ctx)
-
conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
-
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
-
})
+
wsconn, err := NewFirehoseConnection(ctx, app.cursor)
if err != nil {
-
sl.Error("failed to connect to relay", "err", err)
+
return fmt.Errorf("error connecting to relay: %w", err)
}
+
app.wsconn = wsconn
-
return conn
-
}
-
-
func NewScheduler(
-
ctx context.Context, commitCallback commitHandler,
-
) *parallel.Scheduler {
rsc := events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
-
return commitCallback(ctx, evt)
+
app.cursor.SetFirehoseCursor(evt.Seq)
+
return app.backfill.HandleEvent(ctx, evt)
},
+
// TODO account
+
// TODO identity
}
-
return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
-
}
-
-
func main() {
-
sl := slog.With("source", "backfiller")
-
-
streamClosed := make(chan struct{})
-
streamCtx, streamCancel := context.WithCancel(context.Background())
-
-
stateDb := NewDatabase("state.db")
-
stateDb.AutoMigrate(&backfill.GormDBJob{})
-
stateDb.AutoMigrate(&cursorRecord{})
-
-
contentDb := NewDatabase("database.db")
-
contentDb.AutoMigrate(&FeedGenerator{})
-
-
backend := NewBackend(stateDb, contentDb)
-
-
bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
-
go bf.Start()
-
-
// attach the backfiller to the backend so the curors, the pump,
-
// and the commit callback can use it
-
backend.bf = bf
+
sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler)
-
go backend.SyncCursors(streamCtx)
-
-
cursor, err := backend.LoadCursor("firehose")
-
if err != nil {
-
sl.Error("failed loading firehose cursor", "err", err)
+
if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil {
+
return fmt.Errorf("error starting repo stream handler: %w", err)
}
-
conn := NewFirehose(streamCtx, cursor)
-
sched := NewScheduler(streamCtx, backend.RepoCommitHandler)
-
go func() {
-
if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
-
sl.Error("failed to start scheduler", "err", err)
-
}
-
close(streamClosed)
-
}()
+
return nil
+
}
-
go func() {
-
if err := backend.PumpRepos(streamCtx); err != nil {
-
sl.Error("failed pumping repos", "err", err)
+
func (app *App) Stop(ctx context.Context) error {
+
closeDatabase := func(db *gorm.DB) error {
+
raw, err := db.DB()
+
if err != nil {
+
return fmt.Errorf("error getting raw DB: %w", err)
+
}
+
if err := raw.Close(); err != nil {
+
return fmt.Errorf("error closing DB: %w", err)
}
-
}()
+
return nil
+
}
-
quit := make(chan struct{})
-
exitSignals := make(chan os.Signal, 1)
-
signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
-
go func() {
-
select {
-
case sig := <-exitSignals:
-
sl.Info("received OS exit signal", "signal", sig)
-
case <-streamClosed:
-
//
-
}
+
if err := closeDatabase(app.state); err != nil {
+
return err
+
}
-
conn.Close()
+
if err := closeDatabase(app.content); err != nil {
+
return err
+
}
-
streamCancel()
-
<-streamClosed
+
if err := app.backfill.Stop(ctx); err != nil {
+
return err
+
}
-
time.Sleep(time.Millisecond * 100)
+
return nil
+
}
-
endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
-
defer cancel()
-
bf.Stop(endctx)
+
func main() {
+
ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
+
defer cancel()
-
close(quit)
-
}()
+
app := NewApp()
+
if err := app.Start(ctx); err != nil {
+
slog.Error("failed to start backfiller", "err", err)
+
}
-
<-quit
+
<-ctx.Done()
+
slog.Info("shutting down")
-
sl.Info("flushing cursors")
-
if err := backend.FlushCursors(); err != nil {
-
sl.Error("failed to flush cursors on close", "err", err)
-
}
+
endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15)
+
defer cancel()
-
sl.Info("closing databases")
-
if err := backend.CleanlyClose(); err != nil {
-
sl.Error("failed to close databases", "err", err)
+
if err := app.Stop(endctx); err != nil {
+
slog.Error("error during shutdown", "err", err)
}
}
+10 -9
cmd/backfiller/models.go
···
package main
-
type FeedGenerator struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string `gorm:"unique"`
-
DisplayName string
-
FeedService string
-
Description *string
-
ContentMode *string
-
AcceptsInteractions *bool
-
CreatedAt string
+
type Account struct{}
+
type Profile struct{}
+
type List struct{}
+
type Labeler struct{}
+
type Feedgen struct {
+
ID uint `gorm:"primaryKey"`
+
DisplayName string
}
+
type StarterPack struct{}
+
type Verification struct{}
+
type Lexicon struct{}
-69
cmd/backfiller/pump.go
···
-
package main
-
-
import (
-
"context"
-
"fmt"
-
"log/slog"
-
-
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
-
"github.com/bluesky-social/indigo/xrpc"
-
)
-
-
type jobMaker interface {
-
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
-
}
-
-
func (b *Backend) PumpRepos(ctx context.Context) error {
-
sl := slog.With("source", "pumpRepos")
-
bf := b.bf
-
-
xrpcc := &xrpc.Client{
-
Host: "https://bsky.network",
-
}
-
-
jmstore, ok := bf.Store.(jobMaker)
-
if !ok {
-
return fmt.Errorf("configured job store doesn't support random job creation")
-
}
-
-
curs, err := b.LoadCursor("repos")
-
if err != nil {
-
sl.Error("failed to load repos cursor", "err", err)
-
}
-
-
for {
-
select {
-
case <-ctx.Done():
-
sl.Info("stopping repo pump")
-
return nil
-
default:
-
//
-
}
-
-
sl.Info("listing repos", "cursor", curs)
-
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
-
if err != nil {
-
return fmt.Errorf("error listing repos: %w", err)
-
}
-
-
for _, repo := range res.Repos {
-
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
-
if err != nil {
-
sl.Warn("failed to create backfill job", "err", err)
-
continue
-
}
-
}
-
-
if res.Cursor != nil && *res.Cursor != "" {
-
curs = *res.Cursor
-
b.reposLk.Lock()
-
b.reposSeq = curs
-
b.reposLk.Unlock()
-
} else {
-
break
-
}
-
}
-
-
return nil
-
}