an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log"
6 "log/slog"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/backfill"
15 "github.com/bluesky-social/indigo/events"
16 "github.com/bluesky-social/indigo/events/schedulers/parallel"
17
18 "github.com/gorilla/websocket"
19 "gorm.io/driver/sqlite"
20 "gorm.io/gorm"
21 "gorm.io/gorm/logger"
22)
23
24func NewDatabase() *gorm.DB {
25 sl := slog.With("source", "database")
26 newLogger := logger.New(
27 log.New(os.Stdout, "\r\n", log.LstdFlags),
28 logger.Config{
29 SlowThreshold: 1 * time.Second,
30 Colorful: false,
31 },
32 )
33 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
34 Logger: newLogger,
35 })
36 if err != nil {
37 sl.Error("failed to connect to database", "err", err)
38 }
39 db.AutoMigrate(&backfill.GormDBJob{})
40 db.AutoMigrate(&cursorRecord{})
41
42 return db
43}
44
45func NewBackfiller(
46 db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete,
47) *backfill.Backfiller {
48 opts := &backfill.BackfillOptions{
49 // ParallelBackfills: 50,
50 // ParallelRecordCreates: 25,
51 // SyncRequestsPerSecond: 25,
52
53 ParallelBackfills: 10,
54 ParallelRecordCreates: 1, // sqlite
55 SyncRequestsPerSecond: 5,
56
57 RelayHost: "https://bsky.network",
58 }
59
60 return backfill.NewBackfiller(
61 "backfills",
62 backfill.NewGormstore(db),
63 create, update, delete,
64 opts,
65 )
66}
67
68func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
69 sl := slog.With("source", "firehose")
70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
71 if cursor != "" {
72 subscribeUrl += "?cursor=" + cursor
73 }
74
75 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
76 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
77 })
78 if err != nil {
79 sl.Error("failed to connect to relay", "err", err)
80 }
81
82 return conn
83}
84
85func NewScheduler(
86 ctx context.Context, commitCallback commitHandler,
87) *parallel.Scheduler {
88 rsc := events.RepoStreamCallbacks{
89 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
90 return commitCallback(ctx, evt)
91 },
92 }
93
94 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
95}
96
97func main() {
98 sl := slog.With("source", "backfiller")
99
100 streamClosed := make(chan struct{})
101 streamCtx, streamCancel := context.WithCancel(context.Background())
102
103 db := NewDatabase()
104 backend := NewBackend(db)
105
106 bf := NewBackfiller(db, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp)
107 go bf.Start()
108
109 // attach the backfiller to the backend so pump and repo commit handler can use it
110 backend.bf = bf
111
112 go backend.SyncCursors(streamCtx)
113
114 cursor, err := backend.LoadCursor("firehose")
115 if err != nil {
116 sl.Error("failed loading firehose cursor", "err", err)
117 }
118 conn := NewFirehose(streamCtx, cursor)
119
120 sched := NewScheduler(streamCtx, backend.RepoCommitHandler)
121 go func() {
122 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
123 sl.Error("failed to start scheduler", "err", err)
124 }
125 close(streamClosed)
126 }()
127
128 go func() {
129 if err := backend.PumpRepos(streamCtx); err != nil {
130 sl.Error("failed pumping repos", "err", err)
131 }
132 }()
133
134 quit := make(chan struct{})
135 exitSignals := make(chan os.Signal, 1)
136 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
137 go func() {
138 select {
139 case sig := <-exitSignals:
140 sl.Info("received OS exit signal", "signal", sig)
141 case <-streamClosed:
142 //
143 }
144
145 conn.Close()
146
147 streamCancel()
148 <-streamClosed
149
150 time.Sleep(time.Millisecond * 100)
151
152 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
153 defer cancel()
154 bf.Stop(endctx)
155
156 if err := backend.FlushCursors(); err != nil {
157 sl.Error("final flush cursor failed", "err", err)
158 }
159
160 close(quit)
161 }()
162
163 <-quit
164
165 if err := backend.FlushCursors(); err != nil {
166 sl.Error("failed to flush cursors on close", "err", err)
167 }
168}