an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log"
7 "log/slog"
8 "net/http"
9 "os"
10 "os/signal"
11 "syscall"
12 "time"
13
14 "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/backfill"
16 "github.com/bluesky-social/indigo/events"
17 "github.com/bluesky-social/indigo/events/schedulers/parallel"
18
19 "github.com/gorilla/websocket"
20 "gorm.io/driver/sqlite"
21 "gorm.io/gorm"
22 "gorm.io/gorm/logger"
23)
24
25func NewDatabase() *gorm.DB {
26 sl := slog.With("source", "database")
27
28 newLogger := logger.New(
29 log.New(os.Stdout, "\n", log.LstdFlags),
30 logger.Config{
31 SlowThreshold: 1 * time.Second,
32 Colorful: false,
33 },
34 )
35 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
36 Logger: newLogger,
37 })
38 if err != nil {
39 sl.Error("failed to connect to database", "err", err)
40 }
41 db.AutoMigrate(&backfill.GormDBJob{})
42 db.AutoMigrate(&cursorRecord{})
43 return db
44}
45
46func NewBackfiller(db *gorm.DB) *backfill.Backfiller {
47 opts := &backfill.BackfillOptions{
48 ParallelBackfills: 50,
49 ParallelRecordCreates: 25,
50 SyncRequestsPerSecond: 25,
51 RelayHost: "https://bsky.network",
52 }
53 return backfill.NewBackfiller(
54 "backfills",
55 backfill.NewGormstore(db),
56 handleCreate,
57 handleUpdate,
58 handleDelete,
59 opts,
60 )
61}
62
63func NewFirehose(backend *Backend) *websocket.Conn {
64 sl := slog.With("source", "firehose")
65
66 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
67
68 cursor, err := backend.LoadCursor("firehose")
69 if err != nil {
70 sl.Error("failed loading firehose cursor", "err", err)
71 }
72 cc, ok := cursor.(int64)
73 if ok {
74 subscribeUrl += fmt.Sprintf("?cursor=%d", cc)
75 }
76
77 conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{
78 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
79 })
80 if err != nil {
81 sl.Error("failed to connect to relay", "err", err)
82 }
83 return conn
84}
85
86func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler {
87 rsc := events.RepoStreamCallbacks{
88 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
89 return backend.RepoCommitHandler(ctx, evt)
90 },
91 }
92 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
93}
94
95func main() {
96 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
97 defer stop()
98
99 sl := slog.With("source", "backfiller")
100
101 db := NewDatabase()
102
103 bf := NewBackfiller(db)
104 go bf.Start()
105
106 backend := NewBackend(db, bf)
107 go backend.SyncCursors()
108 go backend.PumpRepos(ctx)
109
110 conn := NewFirehose(backend)
111 sched := NewScheduler(ctx, backend)
112 if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil {
113 sl.Error("failed to start scheduler", "err", err)
114 }
115 <-ctx.Done()
116 bf.Stop(context.TODO())
117}