an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 8 "github.com/bluesky-social/indigo/api/atproto" 9 "github.com/bluesky-social/indigo/backfill" 10 "github.com/bluesky-social/indigo/xrpc" 11) 12 13type jobMaker interface { 14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 15} 16 17func (b *Backend) PumpRepos(ctx context.Context) error { 18 sl := slog.With("source", "pumpRepos") 19 20 if err := pumpRepos(ctx, b); err != nil { 21 sl.Error("failed pumping repos", "err", err) 22 } 23 24 return nil 25} 26 27func pumpRepos(ctx context.Context, backend *Backend) error { 28 sl := slog.With("source", "pumpRepos") 29 bf := backend.bf 30 31 xrpcc := &xrpc.Client{ 32 Host: "https://bsky.network", 33 } 34 35 jmstore, ok := bf.Store.(jobMaker) 36 if !ok { 37 return fmt.Errorf("configured job store doesn't support random job creation") 38 } 39 40 curs, err := backend.LoadCursor("repos") 41 if err != nil { 42 sl.Error("failed to load repos cursor", "err", err) 43 } 44 45 for { 46 select { 47 case <-ctx.Done(): 48 sl.Info("stopping repo pump") 49 return nil 50 default: 51 } 52 53 sl.Info("listing repos", "cursor", curs) 54 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 55 if err != nil { 56 return fmt.Errorf("error listing repos: %w", err) 57 } 58 59 for _, repo := range res.Repos { 60 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 61 if err != nil { 62 sl.Warn("failed to create backfill job", "err", err) 63 continue 64 } 65 } 66 67 if res.Cursor != nil && *res.Cursor != "" { 68 curs = *res.Cursor 69 backend.reposLk.Lock() 70 backend.reposSeq = curs 71 backend.reposLk.Unlock() 72 } else { 73 break 74 } 75 } 76 77 return nil 78}