an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 8 "github.com/bluesky-social/indigo/api/atproto" 9 "github.com/bluesky-social/indigo/backfill" 10 "github.com/bluesky-social/indigo/xrpc" 11) 12 13type jobMaker interface { 14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 15} 16 17func (b *Backend) PumpRepos(ctx context.Context) error { 18 sl := slog.With("source", "pumpRepos") 19 bf := b.bf 20 21 xrpcc := &xrpc.Client{ 22 Host: "https://bsky.network", 23 } 24 25 jmstore, ok := bf.Store.(jobMaker) 26 if !ok { 27 return fmt.Errorf("configured job store doesn't support random job creation") 28 } 29 30 curs, err := b.LoadCursor("repos") 31 if err != nil { 32 sl.Error("failed to load repos cursor", "err", err) 33 } 34 35 for { 36 select { 37 case <-ctx.Done(): 38 sl.Info("stopping repo pump") 39 return nil 40 default: 41 // 42 } 43 44 sl.Info("listing repos", "cursor", curs) 45 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 46 if err != nil { 47 return fmt.Errorf("error listing repos: %w", err) 48 } 49 50 for _, repo := range res.Repos { 51 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 52 if err != nil { 53 sl.Warn("failed to create backfill job", "err", err) 54 continue 55 } 56 } 57 58 if res.Cursor != nil && *res.Cursor != "" { 59 curs = *res.Cursor 60 b.reposLk.Lock() 61 b.reposSeq = curs 62 b.reposLk.Unlock() 63 } else { 64 break 65 } 66 } 67 68 return nil 69}