an app.bsky.* indexer

delete cmd/backfiller

-26
cmd/backfiller/backend.go
···
-
package main
-
-
import (
-
"sync"
-
-
"github.com/bluesky-social/indigo/backfill"
-
"gorm.io/gorm"
-
)
-
-
type Backend struct {
-
state *gorm.DB
-
data *gorm.DB
-
bf *backfill.Backfiller
-
-
firehoseLk sync.Mutex
-
firehoseSeq string
-
reposLk sync.Mutex
-
reposSeq string
-
}
-
-
func NewBackend(state, data *gorm.DB) *Backend {
-
return &Backend{
-
state: state,
-
data: data,
-
}
-
}
-66
cmd/backfiller/cursors.go
···
-
package main
-
-
import (
-
"context"
-
"errors"
-
"fmt"
-
"log/slog"
-
"time"
-
-
"gorm.io/gorm"
-
)
-
-
type cursorRecord struct {
-
ID uint `gorm:"primaryKey"`
-
Key string `gorm:"unique"`
-
Val string
-
}
-
-
func (b *Backend) LoadCursor(key string) (string, error) {
-
var rec cursorRecord
-
if err := b.state.Where("key = ?", key).First(&rec).Error; err != nil {
-
if errors.Is(err, gorm.ErrRecordNotFound) {
-
b.state.Create(&cursorRecord{Key: key, Val: ""})
-
}
-
return "", err
-
}
-
return rec.Val, nil
-
}
-
-
func (b *Backend) FlushCursors() error {
-
sl := slog.With("source", "flushCursors")
-
-
b.firehoseLk.Lock()
-
sl.Info("persisting firehose cursor", "cursor", b.firehoseSeq)
-
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", b.firehoseSeq).Error; err != nil {
-
return fmt.Errorf("failed to persist firehose cursor: %w", err)
-
}
-
b.firehoseLk.Unlock()
-
-
b.reposLk.Lock()
-
sl.Info("persisting repos cursor", "cursor", b.reposSeq)
-
if err := b.state.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", b.reposSeq).Error; err != nil {
-
return fmt.Errorf("failed to persist repos cursor: %w", err)
-
}
-
b.reposLk.Unlock()
-
-
return nil
-
}
-
-
func (b *Backend) SyncCursors(ctx context.Context) error {
-
for range time.Tick(time.Second * 5) {
-
select {
-
case <-ctx.Done():
-
return nil
-
default:
-
//
-
}
-
-
if err := b.FlushCursors(); err != nil {
-
slog.Error("failed to flush cursors", "err", err)
-
return fmt.Errorf("failed to flush cursors: %w", err)
-
}
-
}
-
-
return nil
-
}
-58
cmd/backfiller/database.go
···
-
package main
-
-
import (
-
"fmt"
-
"log"
-
"log/slog"
-
"os"
-
"time"
-
-
"gorm.io/driver/sqlite"
-
"gorm.io/gorm"
-
"gorm.io/gorm/logger"
-
)
-
-
func NewDatabase(path string) *gorm.DB {
-
sl := slog.With("source", "database")
-
l := logger.New(
-
log.New(os.Stdout, "\r\n", log.LstdFlags),
-
logger.Config{
-
SlowThreshold: time.Second,
-
Colorful: false,
-
},
-
)
-
db, err := gorm.Open(sqlite.Open(path), &gorm.Config{
-
Logger: l,
-
})
-
if err != nil {
-
sl.Error("failed to open database", "err", err)
-
}
-
db.Exec("PRAGMA journal_mode=WAL")
-
return db
-
}
-
-
func (b *Backend) CleanlyClose() error {
-
closeDb := func(db *gorm.DB) error {
-
if err := db.Exec("PRAGMA wal_checkpoint(TRUNCATE)").Error; err != nil {
-
return fmt.Errorf("failed checkpointing the WAL: %w", err)
-
}
-
rawDb, err := db.DB()
-
if err != nil {
-
return fmt.Errorf("failed getting underlying DB connection: %w", err)
-
}
-
if err := rawDb.Close(); err != nil {
-
return fmt.Errorf("failed closing underlying DB connection: %w", err)
-
}
-
return nil
-
}
-
-
if err := closeDb(b.state); err != nil {
-
return fmt.Errorf("failed to close state database: %w", err)
-
}
-
-
if err := closeDb(b.data); err != nil {
-
return fmt.Errorf("failed to close content database: %w", err)
-
}
-
-
return nil
-
}
-74
cmd/backfiller/handlers.go
···
-
package main
-
-
import (
-
"bytes"
-
"context"
-
"fmt"
-
"log/slog"
-
"strconv"
-
"strings"
-
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
-
appbsky "github.com/bluesky-social/indigo/api/bsky"
-
"github.com/ipfs/go-cid"
-
)
-
-
type commitHandler func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error
-
-
func (b *Backend) RepoCommitHandler(
-
ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit,
-
) error {
-
select {
-
case <-ctx.Done():
-
return nil
-
default:
-
//
-
}
-
-
b.firehoseLk.Lock()
-
b.firehoseSeq = strconv.Itoa(int(evt.Seq))
-
b.firehoseLk.Unlock()
-
-
return b.bf.HandleEvent(ctx, evt)
-
}
-
-
type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error
-
type handleOpDelete func(context.Context, string, string, string) error
-
-
func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
-
return nil
-
}
-
-
sl := slog.With("source", "HandleCreateOp")
-
-
var out appbsky.FeedGenerator
-
if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
-
sl.Error("failed to unmarshal record", "err", err)
-
return fmt.Errorf("failed to unmarshal record: %w", err)
-
}
-
-
feedgen := &FeedGenerator{
-
AtUri: fmt.Sprintf("at://%s/%s", repo, path),
-
DisplayName: out.DisplayName,
-
FeedService: out.Did,
-
CreatedAt: out.CreatedAt,
-
Description: out.Description,
-
ContentMode: out.ContentMode,
-
AcceptsInteractions: out.AcceptsInteractions,
-
}
-
-
if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil {
-
return fmt.Errorf("error adding feedgen to database: %w", err)
-
}
-
-
return nil
-
}
-
-
func (b *Backend) HandleUpdateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
-
return nil
-
}
-
-
func (b *Backend) HandleDeleteOp(ctx context.Context, repo, rev, path string) error {
-
return nil
-
}
-154
cmd/backfiller/main.go
···
-
package main
-
-
import (
-
"context"
-
"log/slog"
-
"net/http"
-
"os"
-
"os/signal"
-
"syscall"
-
"time"
-
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
-
"github.com/bluesky-social/indigo/events"
-
"github.com/bluesky-social/indigo/events/schedulers/parallel"
-
-
"github.com/gorilla/websocket"
-
"gorm.io/gorm"
-
)
-
-
func NewBackfiller(
-
db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
-
) *backfill.Backfiller {
-
opts := &backfill.BackfillOptions{
-
// ParallelBackfills: 50,
-
// ParallelRecordCreates: 25,
-
// SyncRequestsPerSecond: 25,
-
-
ParallelBackfills: 10,
-
ParallelRecordCreates: 1, // sqlite
-
SyncRequestsPerSecond: 5,
-
-
// NSIDFilter: "app.bsky.feed.generator",
-
RelayHost: "https://bsky.network",
-
}
-
-
return backfill.NewBackfiller(
-
"backfills",
-
backfill.NewGormstore(db),
-
create, update, delete,
-
opts,
-
)
-
}
-
-
func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
-
sl := slog.With("source", "firehose")
-
subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
-
if cursor != "" {
-
subscribeUrl += "?cursor=" + cursor
-
}
-
-
conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
-
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
-
})
-
if err != nil {
-
sl.Error("failed to connect to relay", "err", err)
-
}
-
-
return conn
-
}
-
-
func NewScheduler(
-
ctx context.Context, commitCallback commitHandler,
-
) *parallel.Scheduler {
-
rsc := events.RepoStreamCallbacks{
-
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
-
return commitCallback(ctx, evt)
-
},
-
}
-
-
return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
-
}
-
-
func main() {
-
sl := slog.With("source", "backfiller")
-
-
streamClosed := make(chan struct{})
-
streamCtx, streamCancel := context.WithCancel(context.Background())
-
-
stateDb := NewDatabase("state.db")
-
stateDb.AutoMigrate(&backfill.GormDBJob{})
-
stateDb.AutoMigrate(&cursorRecord{})
-
-
contentDb := NewDatabase("database.db")
-
contentDb.AutoMigrate(&FeedGenerator{})
-
-
backend := NewBackend(stateDb, contentDb)
-
-
bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
-
go bf.Start()
-
-
// attach the backfiller to the backend so the curors, the pump,
-
// and the commit callback can use it
-
backend.bf = bf
-
-
go backend.SyncCursors(streamCtx)
-
-
cursor, err := backend.LoadCursor("firehose")
-
if err != nil {
-
sl.Error("failed loading firehose cursor", "err", err)
-
}
-
conn := NewFirehose(streamCtx, cursor)
-
-
sched := NewScheduler(streamCtx, backend.RepoCommitHandler)
-
go func() {
-
if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
-
sl.Error("failed to start scheduler", "err", err)
-
}
-
close(streamClosed)
-
}()
-
-
go func() {
-
if err := backend.PumpRepos(streamCtx); err != nil {
-
sl.Error("failed pumping repos", "err", err)
-
}
-
}()
-
-
quit := make(chan struct{})
-
exitSignals := make(chan os.Signal, 1)
-
signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
-
go func() {
-
select {
-
case sig := <-exitSignals:
-
sl.Info("received OS exit signal", "signal", sig)
-
case <-streamClosed:
-
//
-
}
-
-
conn.Close()
-
-
streamCancel()
-
<-streamClosed
-
-
time.Sleep(time.Millisecond * 100)
-
-
endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
-
defer cancel()
-
bf.Stop(endctx)
-
-
close(quit)
-
}()
-
-
<-quit
-
-
sl.Info("flushing cursors")
-
if err := backend.FlushCursors(); err != nil {
-
sl.Error("failed to flush cursors on close", "err", err)
-
}
-
-
sl.Info("closing databases")
-
if err := backend.CleanlyClose(); err != nil {
-
sl.Error("failed to close databases", "err", err)
-
}
-
}
-12
cmd/backfiller/models.go
···
-
package main
-
-
type FeedGenerator struct {
-
ID uint `gorm:"primaryKey"`
-
AtUri string `gorm:"unique"`
-
DisplayName string
-
FeedService string
-
Description *string
-
ContentMode *string
-
AcceptsInteractions *bool
-
CreatedAt string
-
}
-69
cmd/backfiller/pump.go
···
-
package main
-
-
import (
-
"context"
-
"fmt"
-
"log/slog"
-
-
"github.com/bluesky-social/indigo/api/atproto"
-
"github.com/bluesky-social/indigo/backfill"
-
"github.com/bluesky-social/indigo/xrpc"
-
)
-
-
type jobMaker interface {
-
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
-
}
-
-
func (b *Backend) PumpRepos(ctx context.Context) error {
-
sl := slog.With("source", "pumpRepos")
-
bf := b.bf
-
-
xrpcc := &xrpc.Client{
-
Host: "https://bsky.network",
-
}
-
-
jmstore, ok := bf.Store.(jobMaker)
-
if !ok {
-
return fmt.Errorf("configured job store doesn't support random job creation")
-
}
-
-
curs, err := b.LoadCursor("repos")
-
if err != nil {
-
sl.Error("failed to load repos cursor", "err", err)
-
}
-
-
for {
-
select {
-
case <-ctx.Done():
-
sl.Info("stopping repo pump")
-
return nil
-
default:
-
//
-
}
-
-
sl.Info("listing repos", "cursor", curs)
-
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
-
if err != nil {
-
return fmt.Errorf("error listing repos: %w", err)
-
}
-
-
for _, repo := range res.Repos {
-
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
-
if err != nil {
-
sl.Warn("failed to create backfill job", "err", err)
-
continue
-
}
-
}
-
-
if res.Cursor != nil && *res.Cursor != "" {
-
curs = *res.Cursor
-
b.reposLk.Lock()
-
b.reposSeq = curs
-
b.reposLk.Unlock()
-
} else {
-
break
-
}
-
}
-
-
return nil
-
}