an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "log/slog"
9 "net/http"
10 "os"
11 "os/signal"
12 "syscall"
13 "time"
14
15 "github.com/bluesky-social/indigo/api/atproto"
16 comatproto "github.com/bluesky-social/indigo/api/atproto"
17 "github.com/bluesky-social/indigo/backfill"
18 "github.com/bluesky-social/indigo/events"
19 "github.com/bluesky-social/indigo/events/schedulers/parallel"
20 "github.com/bluesky-social/indigo/xrpc"
21
22 "github.com/gorilla/websocket"
23 "github.com/ipfs/go-cid"
24 "gorm.io/driver/sqlite"
25 "gorm.io/gorm"
26 "gorm.io/gorm/logger"
27)
28
29func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
30 return nil
31}
32
33func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
34 return nil
35}
36
37func handleDelete(ctx context.Context, repo, rev, path string) error {
38 return nil
39}
40
41func main() {
42 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
43 defer stop()
44
45 sl := slog.With("source", "backfiller")
46
47 newLogger := logger.New(
48 log.New(os.Stdout, "\n", log.LstdFlags),
49 logger.Config{
50 SlowThreshold: 5*time.Second,
51 Colorful: false,
52 },
53 )
54 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
55 Logger: newLogger,
56 })
57 if err != nil {
58 sl.Error("failed to connect to database", "err", err)
59 }
60 db.AutoMigrate(&backfill.GormDBJob{})
61 store := backfill.NewGormstore(db)
62
63 // connect to the relay
64 con, _, err := websocket.DefaultDialer.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{
65 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
66 })
67 if err != nil {
68 sl.Error("failed to connect to relay", "err", err)
69 }
70
71 // start backfilling
72 opts := &backfill.BackfillOptions{
73 ParallelBackfills: 10,
74 ParallelRecordCreates: 100,
75 SyncRequestsPerSecond: 4,
76 RelayHost: "https://bsky.network",
77 }
78 bf := backfill.NewBackfiller("backfills", store, handleCreate, handleUpdate, handleDelete, opts)
79 go bf.Start()
80
81 // pump repos
82 go func(bf *backfill.Backfiller) {
83 if err := pumpRepos(context.TODO(), bf); err != nil {
84 sl.Error("failed pumping repos", "err", err)
85 }
86 }(bf)
87
88 // read from the firehose
89 rsc := events.RepoStreamCallbacks{
90 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
91 job, err := bf.Store.GetJob(ctx, evt.Repo)
92 if job == nil {
93 if errors.Is(err, backfill.ErrJobNotFound) {
94 return nil
95 } else {
96 return fmt.Errorf("error getting job: %w", err)
97 }
98 } else {
99 return bf.HandleEvent(ctx, evt)
100 }
101 },
102 }
103 sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
104 if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
105 sl.Error("failed to start scheduler", "err", err)
106 }
107
108 <-ctx.Done()
109 bf.Stop(context.TODO())
110}
111
112type jobMaker interface {
113 GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
114}
115
116func pumpRepos(ctx context.Context, bf *backfill.Backfiller) error {
117 sl := slog.With("source", "pumpRepos")
118
119 xrpcc := &xrpc.Client{
120 Host: "https://bsky.network",
121 }
122
123 jmstore, ok := bf.Store.(jobMaker)
124 if !ok {
125 return fmt.Errorf("configured job store doesn't support random job creation")
126 }
127
128 var curs string
129 for {
130 sl.Info("listing repos", "cursor", curs)
131 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
132 if err != nil {
133 return fmt.Errorf("error listing repos: %w", err)
134 }
135
136 for _, repo := range res.Repos {
137 _, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
138 if err != nil {
139 sl.Warn("failed to create backfill job", "err", err)
140 continue
141 }
142 }
143
144 if res.Cursor != nil && *res.Cursor != "" {
145 curs = *res.Cursor
146 } else {
147 break
148 }
149 }
150
151 return nil
152}