an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7
8 "github.com/bluesky-social/indigo/api/atproto"
9 "github.com/bluesky-social/indigo/backfill"
10 "github.com/bluesky-social/indigo/xrpc"
11)
12
13type jobMaker interface {
14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
15}
16
17func (b *Backend) PumpRepos(ctx context.Context) error {
18 sl := slog.With("source", "pumpRepos")
19
20 if err := pumpRepos(ctx, b); err != nil {
21 sl.Error("failed pumping repos", "err", err)
22 }
23
24 return nil
25}
26
27func pumpRepos(ctx context.Context, backend *Backend) error {
28 sl := slog.With("source", "pumpRepos")
29 bf := backend.bf
30
31 xrpcc := &xrpc.Client{
32 Host: "https://bsky.network",
33 }
34
35 jmstore, ok := bf.Store.(jobMaker)
36 if !ok {
37 return fmt.Errorf("configured job store doesn't support random job creation")
38 }
39
40 cursor, err := backend.LoadCursor("repos")
41 if err != nil {
42 sl.Error("failed to load repos cursor", "err", err)
43 }
44 curs, _ := cursor.(string)
45
46 for {
47 select {
48 case <-ctx.Done():
49 sl.Info("stopping repo pump")
50 return nil
51 default:
52 }
53
54 sl.Info("listing repos", "cursor", curs)
55 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
56 if err != nil {
57 return fmt.Errorf("error listing repos: %w", err)
58 }
59
60 for _, repo := range res.Repos {
61 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
62 if err != nil {
63 sl.Warn("failed to create backfill job", "err", err)
64 continue
65 }
66 }
67
68 if res.Cursor != nil && *res.Cursor != "" {
69 cursor = *res.Cursor
70 } else {
71 break
72 }
73 }
74
75 return nil
76}