an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log"
6 "log/slog"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/backfill"
15 "github.com/bluesky-social/indigo/events"
16 "github.com/bluesky-social/indigo/events/schedulers/parallel"
17
18 "github.com/gorilla/websocket"
19 "gorm.io/driver/sqlite"
20 "gorm.io/gorm"
21 "gorm.io/gorm/logger"
22)
23
24func NewDatabase() *gorm.DB {
25 sl := slog.With("source", "database")
26 newLogger := logger.New(
27 log.New(os.Stdout, "\r\n", log.LstdFlags),
28 logger.Config{
29 SlowThreshold: 1 * time.Second,
30 Colorful: false,
31 },
32 )
33 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
34 Logger: newLogger,
35 })
36 if err != nil {
37 sl.Error("failed to connect to database", "err", err)
38 }
39 db.AutoMigrate(&backfill.GormDBJob{})
40 db.AutoMigrate(&cursorRecord{})
41
42 return db
43}
44
45func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
46 opts := &backfill.BackfillOptions{
47 // ParallelBackfills: 50,
48 // ParallelRecordCreates: 25,
49 // SyncRequestsPerSecond: 25,
50
51 ParallelBackfills: 10,
52 ParallelRecordCreates: 1, // sqlite
53 SyncRequestsPerSecond: 5,
54
55 RelayHost: "https://bsky.network",
56 }
57
58 return backfill.NewBackfiller(
59 "backfills",
60 backfill.NewGormstore(db),
61 handleCreate,
62 handleUpdate,
63 handleDelete,
64 opts,
65 )
66}
67
68func NewFirehose(ctx context.Context, cursor string) *websocket.Conn {
69 sl := slog.With("source", "firehose")
70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
71 if cursor != "" {
72 subscribeUrl += "?cursor=" + cursor
73 }
74
75 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{
76 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
77 })
78 if err != nil {
79 sl.Error("failed to connect to relay", "err", err)
80 }
81
82 return conn
83}
84
85func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler {
86 rsc := events.RepoStreamCallbacks{
87 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
88 return backend.RepoCommitHandler(ctx, evt)
89 },
90 }
91
92 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
93}
94
95func main() {
96 sl := slog.With("source", "backfiller")
97
98 streamClosed := make(chan struct{})
99 streamCtx, streamCancel := context.WithCancel(context.Background())
100
101 db := NewDatabase()
102
103 bf := NewBackfiller(db)
104 go bf.Start()
105
106 backend := NewBackend(db, bf)
107 go backend.SyncCursors(streamCtx)
108
109 cursor, err := backend.LoadCursor("firehose")
110 if err != nil {
111 sl.Error("failed loading firehose cursor", "err", err)
112 }
113 conn := NewFirehose(streamCtx, cursor)
114
115 sched := NewScheduler(streamCtx, backend)
116 go func() {
117 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil {
118 sl.Error("failed to start scheduler", "err", err)
119 }
120 close(streamClosed)
121 }()
122
123 go func() {
124 if err := backend.PumpRepos(streamCtx); err != nil {
125 sl.Error("failed pumping repos", "err", err)
126 } else {
127 sl.Info("finished listing repos, switching over to event stream")
128 }
129 }()
130
131 quit := make(chan struct{})
132 exitSignals := make(chan os.Signal, 1)
133 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM)
134 go func() {
135 select {
136 case sig := <-exitSignals:
137 sl.Info("received OS exit signal", "signal", sig)
138 case <-streamClosed:
139 //
140 }
141
142 conn.Close()
143
144 streamCancel()
145 <-streamClosed
146
147 time.Sleep(time.Millisecond * 100)
148
149 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
150 defer cancel()
151 bf.Stop(endctx)
152
153 if err := backend.FlushCursors(); err != nil {
154 sl.Error("final flush cursor failed", "err", err)
155 }
156
157 close(quit)
158 }()
159
160 <-quit
161
162 if err := backend.FlushCursors(); err != nil {
163 sl.Error("failed to flush cursors on close", "err", err)
164 }
165}