an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os/signal"
8 "syscall"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/backfill"
13 "github.com/bluesky-social/indigo/events"
14 "github.com/bluesky-social/indigo/events/schedulers/parallel"
15 "github.com/gorilla/websocket"
16 "gorm.io/gorm"
17)
18
19type App struct {
20 backfill *backfill.Backfiller
21 cursor *CursorService
22 handler *HandlerService
23 census *CensusService
24 wsconn *websocket.Conn
25 state *gorm.DB
26 content *gorm.DB
27}
28
29func NewApp() *App {
30 stateDatabase := NewDatabase("state.db")
31 stateDatabase.AutoMigrate(&backfill.GormDBJob{})
32
33 contentDatabase := NewDatabase("content.db")
34
35 return &App{
36 state: stateDatabase,
37 content: contentDatabase,
38 }
39}
40
41func (app *App) Start(ctx context.Context) error {
42 app.cursor = NewCursorService(app.state)
43 go app.cursor.CheckpointCursors(ctx)
44
45 app.handler = NewHandlerService(app.content)
46
47 app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler)
48 go app.backfill.Start()
49
50 app.census = NewCensusService(app.cursor, app.backfill)
51 go app.census.Start(ctx)
52
53 wsconn, err := NewFirehoseConnection(ctx, app.cursor)
54 if err != nil {
55 return fmt.Errorf("error connecting to relay: %w", err)
56 }
57 app.wsconn = wsconn
58
59 rsc := events.RepoStreamCallbacks{
60 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
61 app.cursor.SetFirehoseCursor(evt.Seq)
62 return app.backfill.HandleEvent(ctx, evt)
63 },
64 // TODO account
65 // TODO identity
66 }
67
68 sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler)
69
70 if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil {
71 return fmt.Errorf("error starting repo stream handler: %w", err)
72 }
73
74 return nil
75}
76
77func (app *App) Stop(ctx context.Context) error {
78 closeDatabase := func(db *gorm.DB) error {
79 raw, err := db.DB()
80 if err != nil {
81 return fmt.Errorf("error getting raw DB: %w", err)
82 }
83 if err := raw.Close(); err != nil {
84 return fmt.Errorf("error closing DB: %w", err)
85 }
86 return nil
87 }
88
89 if err := closeDatabase(app.state); err != nil {
90 return err
91 }
92
93 if err := closeDatabase(app.content); err != nil {
94 return err
95 }
96
97 if err := app.backfill.Stop(ctx); err != nil {
98 return err
99 }
100
101 return nil
102}
103
104func main() {
105 ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
106 defer cancel()
107
108 app := NewApp()
109 if err := app.Start(ctx); err != nil {
110 slog.Error("failed to start backfiller", "err", err)
111 }
112
113 <-ctx.Done()
114 slog.Info("shutting down")
115
116 endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15)
117 defer cancel()
118
119 if err := app.Stop(endctx); err != nil {
120 slog.Error("error during shutdown", "err", err)
121 }
122}