an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 8 "github.com/bluesky-social/indigo/api/atproto" 9 "github.com/bluesky-social/indigo/backfill" 10 "github.com/bluesky-social/indigo/xrpc" 11) 12 13type jobMaker interface { 14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 15} 16 17func (b *Backend) PumpRepos(ctx context.Context) error { 18 sl := slog.With("source", "pumpRepos") 19 20 if err := pumpRepos(ctx, b); err != nil { 21 sl.Error("failed pumping repos", "err", err) 22 } 23 24 return nil 25} 26 27func pumpRepos(ctx context.Context, backend *Backend) error { 28 sl := slog.With("source", "pumpRepos") 29 bf := backend.bf 30 31 xrpcc := &xrpc.Client{ 32 Host: "https://bsky.network", 33 } 34 35 jmstore, ok := bf.Store.(jobMaker) 36 if !ok { 37 return fmt.Errorf("configured job store doesn't support random job creation") 38 } 39 40 cursor, err := backend.LoadCursor("repos") 41 if err != nil { 42 sl.Error("failed to load repos cursor", "err", err) 43 } 44 curs, _ := cursor.(string) 45 46 for { 47 select { 48 case <-ctx.Done(): 49 sl.Info("stopping repo pump") 50 return nil 51 default: 52 } 53 54 sl.Info("listing repos", "cursor", curs) 55 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 56 if err != nil { 57 return fmt.Errorf("error listing repos: %w", err) 58 } 59 60 for _, repo := range res.Repos { 61 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 62 if err != nil { 63 sl.Warn("failed to create backfill job", "err", err) 64 continue 65 } 66 } 67 68 if res.Cursor != nil && *res.Cursor != "" { 69 cursor = *res.Cursor 70 } else { 71 break 72 } 73 } 74 75 return nil 76}