an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7
8 "github.com/bluesky-social/indigo/api/atproto"
9 "github.com/bluesky-social/indigo/backfill"
10 "github.com/bluesky-social/indigo/xrpc"
11)
12
13type jobMaker interface {
14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
15}
16
17func (b *Backend) PumpRepos(ctx context.Context) error {
18 sl := slog.With("source", "pumpRepos")
19
20 if err := pumpRepos(ctx, b); err != nil {
21 sl.Error("failed pumping repos", "err", err)
22 }
23
24 return nil
25}
26
27func pumpRepos(ctx context.Context, backend *Backend) error {
28 sl := slog.With("source", "pumpRepos")
29 bf := backend.bf
30
31 xrpcc := &xrpc.Client{
32 Host: "https://bsky.network",
33 }
34
35 jmstore, ok := bf.Store.(jobMaker)
36 if !ok {
37 return fmt.Errorf("configured job store doesn't support random job creation")
38 }
39
40 curs, err := backend.LoadCursor("repos")
41 if err != nil {
42 sl.Error("failed to load repos cursor", "err", err)
43 }
44
45 for {
46 select {
47 case <-ctx.Done():
48 sl.Info("stopping repo pump")
49 return nil
50 default:
51 }
52
53 sl.Info("listing repos", "cursor", curs)
54 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
55 if err != nil {
56 return fmt.Errorf("error listing repos: %w", err)
57 }
58
59 for _, repo := range res.Repos {
60 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
61 if err != nil {
62 sl.Warn("failed to create backfill job", "err", err)
63 continue
64 }
65 }
66
67 if res.Cursor != nil && *res.Cursor != "" {
68 curs = *res.Cursor
69 backend.reposLk.Lock()
70 backend.reposSeq = curs
71 backend.reposLk.Unlock()
72 } else {
73 break
74 }
75 }
76
77 return nil
78}