an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "log"
6 "log/slog"
7 "net/http"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/backfill"
15 "github.com/bluesky-social/indigo/events"
16 "github.com/bluesky-social/indigo/events/schedulers/parallel"
17
18 "github.com/gorilla/websocket"
19 "gorm.io/driver/sqlite"
20 "gorm.io/gorm"
21 "gorm.io/gorm/logger"
22)
23
24func NewDatabase() *gorm.DB {
25 sl := slog.With("source", "database")
26
27 newLogger := logger.New(
28 log.New(os.Stdout, "\n", log.LstdFlags),
29 logger.Config{
30 SlowThreshold: 1 * time.Second,
31 Colorful: false,
32 },
33 )
34 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
35 Logger: newLogger,
36 })
37 if err != nil {
38 sl.Error("failed to connect to database", "err", err)
39 }
40 db.AutoMigrate(&backfill.GormDBJob{})
41 db.AutoMigrate(&cursorRecord{})
42 return db
43}
44
45func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
46 opts := &backfill.BackfillOptions{
47 // ParallelBackfills: 50,
48 // ParallelRecordCreates: 25,
49 // SyncRequestsPerSecond: 25,
50
51 ParallelBackfills: 10,
52 ParallelRecordCreates: 5,
53 SyncRequestsPerSecond: 5,
54
55 RelayHost: "https://bsky.network",
56 }
57 return backfill.NewBackfiller(
58 "backfills",
59 backfill.NewGormstore(db),
60 handleCreate,
61 handleUpdate,
62 handleDelete,
63 opts,
64 )
65}
66
67func NewFirehose(backend *Backend) *websocket.Conn {
68 sl := slog.With("source", "firehose")
69
70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
71
72 cursor, err := backend.LoadCursor("firehose")
73 if err != nil {
74 sl.Error("failed loading firehose cursor", "err", err)
75 }
76 if cursor != "" {
77 subscribeUrl += "?cursor=" + cursor
78 }
79
80 conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
81 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
82 })
83 if err != nil {
84 sl.Error("failed to connect to relay", "err", err)
85 }
86 return conn
87}
88
89func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler {
90 rsc := events.RepoStreamCallbacks{
91 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
92 return backend.RepoCommitHandler(ctx, evt)
93 },
94 }
95 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
96}
97
98func main() {
99 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
100 defer stop()
101
102 sl := slog.With("source", "backfiller")
103
104 db := NewDatabase()
105
106 bf := NewBackfiller(db)
107 go bf.Start()
108
109 backend := NewBackend(db, bf)
110 go backend.SyncCursors()
111 go backend.PumpRepos(ctx)
112
113 conn := NewFirehose(backend)
114 sched := NewScheduler(ctx, backend)
115 if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil {
116 sl.Error("failed to start scheduler", "err", err)
117 }
118 <-ctx.Done()
119 bf.Stop(context.TODO())
120}