an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "log" 8 "log/slog" 9 "net/http" 10 "os" 11 "os/signal" 12 "syscall" 13 "time" 14 15 "github.com/bluesky-social/indigo/api/atproto" 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 "github.com/bluesky-social/indigo/backfill" 18 "github.com/bluesky-social/indigo/events" 19 "github.com/bluesky-social/indigo/events/schedulers/parallel" 20 "github.com/bluesky-social/indigo/xrpc" 21 22 "github.com/gorilla/websocket" 23 "github.com/ipfs/go-cid" 24 "gorm.io/driver/sqlite" 25 "gorm.io/gorm" 26 "gorm.io/gorm/logger" 27) 28 29func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 30 return nil 31} 32 33func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 34 return nil 35} 36 37func handleDelete(ctx context.Context, repo, rev, path string) error { 38 return nil 39} 40 41func main() { 42 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 43 defer stop() 44 45 sl := slog.With("source", "backfiller") 46 47 newLogger := logger.New( 48 log.New(os.Stdout, "\n", log.LstdFlags), 49 logger.Config{ 50 SlowThreshold: 5*time.Second, 51 Colorful: false, 52 }, 53 ) 54 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ 55 Logger: newLogger, 56 }) 57 if err != nil { 58 sl.Error("failed to connect to database", "err", err) 59 } 60 db.AutoMigrate(&backfill.GormDBJob{}) 61 store := backfill.NewGormstore(db) 62 63 // connect to the relay 64 con, _, err := websocket.DefaultDialer.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{ 65 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 66 }) 67 if err != nil { 68 sl.Error("failed to connect to relay", "err", err) 69 } 70 71 // start backfilling 72 opts := &backfill.BackfillOptions{ 73 ParallelBackfills: 10, 74 ParallelRecordCreates: 100, 75 SyncRequestsPerSecond: 4, 76 RelayHost: "https://bsky.network", 77 } 78 bf := backfill.NewBackfiller("backfills", store, handleCreate, handleUpdate, handleDelete, opts) 79 go bf.Start() 80 81 // pump repos 82 go func(bf *backfill.Backfiller) { 83 if err := pumpRepos(context.TODO(), bf); err != nil { 84 sl.Error("failed pumping repos", "err", err) 85 } 86 }(bf) 87 88 // read from the firehose 89 rsc := events.RepoStreamCallbacks{ 90 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 91 job, err := bf.Store.GetJob(ctx, evt.Repo) 92 if job == nil { 93 if errors.Is(err, backfill.ErrJobNotFound) { 94 return nil 95 } else { 96 return fmt.Errorf("error getting job: %w", err) 97 } 98 } else { 99 return bf.HandleEvent(ctx, evt) 100 } 101 }, 102 } 103 sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 104 if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil { 105 sl.Error("failed to start scheduler", "err", err) 106 } 107 108 <-ctx.Done() 109 bf.Stop(context.TODO()) 110} 111 112type jobMaker interface { 113 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 114} 115 116func pumpRepos(ctx context.Context, bf *backfill.Backfiller) error { 117 sl := slog.With("source", "pumpRepos") 118 119 xrpcc := &xrpc.Client{ 120 Host: "https://bsky.network", 121 } 122 123 jmstore, ok := bf.Store.(jobMaker) 124 if !ok { 125 return fmt.Errorf("configured job store doesn't support random job creation") 126 } 127 128 var curs string 129 for { 130 sl.Info("listing repos", "cursor", curs) 131 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 132 if err != nil { 133 return fmt.Errorf("error listing repos: %w", err) 134 } 135 136 for _, repo := range res.Repos { 137 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 138 if err != nil { 139 sl.Warn("failed to create backfill job", "err", err) 140 continue 141 } 142 } 143 144 if res.Cursor != nil && *res.Cursor != "" { 145 curs = *res.Cursor 146 } else { 147 break 148 } 149 } 150 151 return nil 152}