an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log/slog"
6
7 "github.com/bluesky-social/indigo/api/atproto"
8 "github.com/bluesky-social/indigo/backfill"
9 "github.com/bluesky-social/indigo/xrpc"
10)
11
12type CensusService struct {
13 cursor *CursorService
14 backfill *backfill.Backfiller
15}
16
17type jobMaker interface {
18 GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
19}
20
21func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
22 return &CensusService{
23 cursor: cursorSvc,
24 backfill: backfillSvc,
25 }
26}
27
28func (cs *CensusService) Start(ctx context.Context) {
29 xrpcc := &xrpc.Client{
30 Host: "https://bsky.network",
31 }
32
33 jmstore, ok := cs.backfill.Store.(jobMaker)
34 if !ok {
35 slog.Error("configured job store doesn't support random job creation")
36 return
37 }
38
39 curs, _ := cs.cursor.Get("repos")
40 for {
41 select {
42 case <-ctx.Done():
43 slog.Info("stopping repo census")
44 return
45 default:
46 }
47
48 slog.Info("listing repos", "cursor", curs)
49 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
50 if err != nil {
51 slog.Error("error listing repos", "err", err)
52 return
53 }
54
55 for _, repo := range res.Repos {
56 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
57 if err != nil {
58 slog.Error("error adding listed repo to backfiller", "err", err)
59 }
60 }
61
62 if res.Cursor != nil && *res.Cursor != "" {
63 cs.cursor.reposLk.Lock()
64 curs = *res.Cursor
65 cs.cursor.reposSeq = curs
66 cs.cursor.reposLk.Unlock()
67 } else {
68 break
69 }
70 }
71}