an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "log/slog" 8 "net/http" 9 "os" 10 "os/signal" 11 "syscall" 12 "time" 13 14 "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/backfill" 16 "github.com/bluesky-social/indigo/events" 17 "github.com/bluesky-social/indigo/events/schedulers/parallel" 18 19 "github.com/gorilla/websocket" 20 "gorm.io/driver/sqlite" 21 "gorm.io/gorm" 22 "gorm.io/gorm/logger" 23) 24 25func NewDatabase() *gorm.DB { 26 sl := slog.With("source", "database") 27 28 newLogger := logger.New( 29 log.New(os.Stdout, "\n", log.LstdFlags), 30 logger.Config{ 31 SlowThreshold: 1 * time.Second, 32 Colorful: false, 33 }, 34 ) 35 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ 36 Logger: newLogger, 37 }) 38 if err != nil { 39 sl.Error("failed to connect to database", "err", err) 40 } 41 db.AutoMigrate(&backfill.GormDBJob{}) 42 db.AutoMigrate(&cursorRecord{}) 43 return db 44} 45 46func NewBackfiller(db *gorm.DB) *backfill.Backfiller { 47 opts := &backfill.BackfillOptions{ 48 ParallelBackfills: 50, 49 ParallelRecordCreates: 25, 50 SyncRequestsPerSecond: 25, 51 RelayHost: "https://bsky.network", 52 } 53 return backfill.NewBackfiller( 54 "backfills", 55 backfill.NewGormstore(db), 56 handleCreate, 57 handleUpdate, 58 handleDelete, 59 opts, 60 ) 61} 62 63func NewFirehose(backend *Backend) *websocket.Conn { 64 sl := slog.With("source", "firehose") 65 66 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 67 68 cursor, err := backend.LoadCursor("firehose") 69 if err != nil { 70 sl.Error("failed loading firehose cursor", "err", err) 71 } 72 cc, ok := cursor.(int64) 73 if ok { 74 subscribeUrl += fmt.Sprintf("?cursor=%d", cc) 75 } 76 77 conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{ 78 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 79 }) 80 if err != nil { 81 sl.Error("failed to connect to relay", "err", err) 82 } 83 return conn 84} 85 86func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler { 87 rsc := events.RepoStreamCallbacks{ 88 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 89 return backend.RepoCommitHandler(ctx, evt) 90 }, 91 } 92 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 93} 94 95func main() { 96 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 97 defer stop() 98 99 sl := slog.With("source", "backfiller") 100 101 db := NewDatabase() 102 103 bf := NewBackfiller(db) 104 go bf.Start() 105 106 backend := NewBackend(db, bf) 107 go backend.SyncCursors() 108 go backend.PumpRepos(ctx) 109 110 conn := NewFirehose(backend) 111 sched := NewScheduler(ctx, backend) 112 if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil { 113 sl.Error("failed to start scheduler", "err", err) 114 } 115 <-ctx.Done() 116 bf.Stop(context.TODO()) 117}