an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log/slog" 6 7 "github.com/bluesky-social/indigo/api/atproto" 8 "github.com/bluesky-social/indigo/backfill" 9 "github.com/bluesky-social/indigo/xrpc" 10) 11 12type CensusService struct { 13 cursor *CursorService 14 backfill *backfill.Backfiller 15} 16 17type jobMaker interface { 18 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 19} 20 21func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService { 22 return &CensusService{ 23 cursor: cursorSvc, 24 backfill: backfillSvc, 25 } 26} 27 28func (cs *CensusService) Start(ctx context.Context) { 29 xrpcc := &xrpc.Client{ 30 Host: "https://bsky.network", 31 } 32 33 jmstore, ok := cs.backfill.Store.(jobMaker) 34 if !ok { 35 slog.Error("configured job store doesn't support random job creation") 36 return 37 } 38 39 curs, _ := cs.cursor.Get("repos") 40 for { 41 select { 42 case <-ctx.Done(): 43 slog.Info("stopping repo census") 44 return 45 default: 46 } 47 48 slog.Info("listing repos", "cursor", curs) 49 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 50 if err != nil { 51 slog.Error("error listing repos", "err", err) 52 return 53 } 54 55 for _, repo := range res.Repos { 56 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 57 if err != nil { 58 slog.Error("error adding listed repo to backfiller", "err", err) 59 } 60 } 61 62 if res.Cursor != nil && *res.Cursor != "" { 63 cs.cursor.reposLk.Lock() 64 curs = *res.Cursor 65 cs.cursor.reposSeq = curs 66 cs.cursor.reposLk.Unlock() 67 } else { 68 break 69 } 70 } 71}