an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7
8 "github.com/bluesky-social/indigo/api/atproto"
9 "github.com/bluesky-social/indigo/backfill"
10 "github.com/bluesky-social/indigo/xrpc"
11)
12
13type jobMaker interface {
14 GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
15}
16
17func (b *Backend) PumpRepos(ctx context.Context) error {
18 sl := slog.With("source", "pumpRepos")
19 bf := b.bf
20
21 xrpcc := &xrpc.Client{
22 Host: "https://bsky.network",
23 }
24
25 jmstore, ok := bf.Store.(jobMaker)
26 if !ok {
27 return fmt.Errorf("configured job store doesn't support random job creation")
28 }
29
30 curs, err := b.LoadCursor("repos")
31 if err != nil {
32 sl.Error("failed to load repos cursor", "err", err)
33 }
34
35 for {
36 select {
37 case <-ctx.Done():
38 sl.Info("stopping repo pump")
39 return nil
40 default:
41 //
42 }
43
44 sl.Info("listing repos", "cursor", curs)
45 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
46 if err != nil {
47 return fmt.Errorf("error listing repos: %w", err)
48 }
49
50 for _, repo := range res.Repos {
51 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
52 if err != nil {
53 sl.Warn("failed to create backfill job", "err", err)
54 continue
55 }
56 }
57
58 if res.Cursor != nil && *res.Cursor != "" {
59 curs = *res.Cursor
60 b.reposLk.Lock()
61 b.reposSeq = curs
62 b.reposLk.Unlock()
63 } else {
64 break
65 }
66 }
67
68 return nil
69}