an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log/slog"
6 "net/http"
7 "os"
8 "os/signal"
9 "syscall"
10 "time"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/backfill"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16
17 "github.com/gorilla/websocket"
18 "gorm.io/gorm"
19)
20
21func NewBackfiller(
22 db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
23) *backfill.Backfiller {
24 opts := &backfill.BackfillOptions{
25 // ParallelBackfills: 50,
26 // ParallelRecordCreates: 25,
27 // SyncRequestsPerSecond: 25,
28
29 ParallelBackfills: 10,
30 ParallelRecordCreates: 1, // sqlite
31 SyncRequestsPerSecond: 5,
32
33 // NSIDFilter: "app.bsky.feed.generator",
34 RelayHost: "https://bsky.network",
35 }
36
37 return backfill.NewBackfiller(
38 "backfills",
39 backfill.NewGormstore(db),
40 create, update, delete,
41 opts,
42 )
43}
44
45func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
46 sl := slog.With("source", "firehose")
47 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
48 if cursor != "" {
49 subscribeUrl += "?cursor=" + cursor
50 }
51
52 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
53 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
54 })
55 if err != nil {
56 sl.Error("failed to connect to relay", "err", err)
57 }
58
59 return conn
60}
61
62func NewScheduler(
63 ctx context.Context, commitCallback commitHandler,
64) *parallel.Scheduler {
65 rsc := events.RepoStreamCallbacks{
66 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
67 return commitCallback(ctx, evt)
68 },
69 }
70
71 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
72}
73
74func main() {
75 sl := slog.With("source", "backfiller")
76
77 streamClosed := make(chan struct{})
78 streamCtx, streamCancel := context.WithCancel(context.Background())
79
80 stateDb := NewDatabase("state.db")
81 stateDb.AutoMigrate(&backfill.GormDBJob{})
82 stateDb.AutoMigrate(&cursorRecord{})
83
84 contentDb := NewDatabase("database.db")
85 contentDb.AutoMigrate(&FeedGenerator{})
86
87 backend := NewBackend(stateDb, contentDb)
88
89 bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
90 go bf.Start()
91
92 // attach the backfiller to the backend so the curors, the pump,
93 // and the commit callback can use it
94 backend.bf = bf
95
96 go backend.SyncCursors(streamCtx)
97
98 cursor, err := backend.LoadCursor("firehose")
99 if err != nil {
100 sl.Error("failed loading firehose cursor", "err", err)
101 }
102 conn := NewFirehose(streamCtx, cursor)
103
104 sched := NewScheduler(streamCtx, backend.RepoCommitHandler)
105 go func() {
106 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
107 sl.Error("failed to start scheduler", "err", err)
108 }
109 close(streamClosed)
110 }()
111
112 go func() {
113 if err := backend.PumpRepos(streamCtx); err != nil {
114 sl.Error("failed pumping repos", "err", err)
115 }
116 }()
117
118 quit := make(chan struct{})
119 exitSignals := make(chan os.Signal, 1)
120 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
121 go func() {
122 select {
123 case sig := <-exitSignals:
124 sl.Info("received OS exit signal", "signal", sig)
125 case <-streamClosed:
126 //
127 }
128
129 conn.Close()
130
131 streamCancel()
132 <-streamClosed
133
134 time.Sleep(time.Millisecond * 100)
135
136 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
137 defer cancel()
138 bf.Stop(endctx)
139
140 close(quit)
141 }()
142
143 <-quit
144
145 sl.Info("flushing cursors")
146 if err := backend.FlushCursors(); err != nil {
147 sl.Error("failed to flush cursors on close", "err", err)
148 }
149
150 sl.Info("closing databases")
151 if err := backend.CleanlyClose(); err != nil {
152 sl.Error("failed to close databases", "err", err)
153 }
154}