an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "net/http" 6 // "strings" 7 // "log/slog" 8 "log" 9 "os" 10 "os/signal" 11 "syscall" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/backfill" 15 "github.com/bluesky-social/indigo/events" 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 18 "github.com/gorilla/websocket" 19 "github.com/ipfs/go-cid" 20 "gorm.io/driver/sqlite" 21 "gorm.io/gorm" 22) 23 24func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 25 log.Printf("handleCreate: at://%s/%s", repo, path) 26 return nil 27} 28 29func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 30 return nil 31} 32 33func handleDelete(ctx context.Context, repo, rev, path string) error { 34 return nil 35} 36 37func main() { 38 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 39 defer stop() 40 41 db, err := gorm.Open(sqlite.Open("backfiller.db"), &gorm.Config{}) 42 if err != nil { 43 log.Fatalf("failed to connect database: %s", err) 44 } 45 db.AutoMigrate(&backfill.GormDBJob{}) 46 store := backfill.NewGormstore(db) 47 48 // connect to the relay 49 d := websocket.DefaultDialer 50 con, _, err := d.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{}) 51 if err != nil { 52 log.Fatalf("failed to connect to relay: %s", err) 53 } 54 55 rsc := events.RepoStreamCallbacks{ 56 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 57 // log.Printf("enqueuing: %s", evt.Repo) 58 return store.EnqueueJobWithState(ctx, evt.Repo, backfill.StateEnqueued) 59 }, 60 } 61 62 // start backfilling 63 opts := &backfill.BackfillOptions{ 64 ParallelBackfills: 10, 65 ParallelRecordCreates: 100, 66 NSIDFilter: "app.bsky.labeler.service", 67 SyncRequestsPerSecond: 10, 68 RelayHost: "https://bsky.network", 69 } 70 bf := backfill.NewBackfiller("backfiller-labels", store, handleCreate, handleUpdate, handleDelete, opts) 71 go bf.Start() 72 73 sched := parallel.NewScheduler(4, 1000, "backfiller-labels", rsc.EventHandler) 74 if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil { 75 log.Fatalf("failed to start scheduler: %s", err) 76 } 77 78 <-ctx.Done() 79 bf.Stop(ctx) 80}