an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os/signal"
8 "syscall"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/backfill"
13 "github.com/bluesky-social/indigo/events"
14 "github.com/bluesky-social/indigo/events/schedulers/parallel"
15 "github.com/gorilla/websocket"
16 "gorm.io/gorm"
17)
18
19type App struct {
20 backfill *backfill.Backfiller
21 cursor *CursorService
22 handler *HandlerService
23 census *CensusService
24 wsconn *websocket.Conn
25 state *gorm.DB
26 content *gorm.DB
27}
28
29func NewApp() *App {
30 stateDatabase := NewDatabase("state.db")
31 stateDatabase.AutoMigrate(&backfill.GormDBJob{})
32
33 contentDatabase := NewDatabase("content.db")
34
35 return &App{
36 state: stateDatabase,
37 content: contentDatabase,
38 }
39}
40
41func (app *App) Start(ctx context.Context) error {
42 app.cursor = NewCursorService(app.state)
43 go app.cursor.CheckpointCursors(ctx)
44
45 app.handler = NewHandlerService(app.content)
46
47 app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler)
48 go app.backfill.Start()
49
50 app.census = NewCensusService(app.cursor, app.backfill)
51 go app.census.Start(ctx)
52
53 wsconn, err := NewFirehoseConnection(ctx, app.cursor)
54 if err != nil {
55 return fmt.Errorf("error connecting to relay: %w", err)
56 }
57 app.wsconn = wsconn
58
59 rsc := events.RepoStreamCallbacks{
60 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
61 app.cursor.SetFirehoseCursor(evt.Seq)
62 return app.backfill.HandleEvent(ctx, evt)
63 },
64 // TODO account
65 // TODO identity
66 }
67
68 sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler)
69
70 if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil {
71 return fmt.Errorf("error starting repo stream handler: %w", err)
72 }
73
74 return nil
75}
76
77func (app *App) Stop(ctx context.Context) error {
78 closeDatabase := func(db *gorm.DB) error {
79 raw, err := db.DB()
80 if err != nil {
81 return fmt.Errorf("error getting raw DB: %w", err)
82 }
83 if err := raw.Close(); err != nil {
84 return fmt.Errorf("error closing DB: %w", err)
85 }
86 return nil
87 }
88
89 app.backfill.Stop(ctx)
90
91 closeDatabase(app.state)
92 closeDatabase(app.content)
93
94 return nil
95}
96
97func main() {
98 ctx, cancel := signal.NotifyContext(context.TODO(), syscall.SIGINT, syscall.SIGTERM)
99 defer cancel()
100
101 app := NewApp()
102 if err := app.Start(ctx); err != nil {
103 slog.Error("failed to start backfiller", "err", err)
104 }
105
106 <-ctx.Done()
107 slog.Info("shutting down")
108
109 endctx, cancel := context.WithTimeout(context.TODO(), time.Second*15)
110 defer cancel()
111
112 if err := app.Stop(endctx); err != nil {
113 slog.Error("error during shutdown", "err", err)
114 }
115}