an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log/slog" 6 "net/http" 7 "os" 8 "os/signal" 9 "syscall" 10 "time" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/backfill" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 17 "github.com/gorilla/websocket" 18 "gorm.io/gorm" 19) 20 21func NewBackfiller( 22 db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete, 23) *backfill.Backfiller { 24 opts := &backfill.BackfillOptions{ 25 // ParallelBackfills: 50, 26 // ParallelRecordCreates: 25, 27 // SyncRequestsPerSecond: 25, 28 29 ParallelBackfills: 10, 30 ParallelRecordCreates: 1, // sqlite 31 SyncRequestsPerSecond: 5, 32 33 // NSIDFilter: "app.bsky.feed.generator", 34 RelayHost: "https://bsky.network", 35 } 36 37 return backfill.NewBackfiller( 38 "backfills", 39 backfill.NewGormstore(db), 40 create, update, delete, 41 opts, 42 ) 43} 44 45func NewFirehose(ctx context.Context, cursor string) *websocket.Conn { 46 sl := slog.With("source", "firehose") 47 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 48 if cursor != "" { 49 subscribeUrl += "?cursor=" + cursor 50 } 51 52 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{ 53 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 54 }) 55 if err != nil { 56 sl.Error("failed to connect to relay", "err", err) 57 } 58 59 return conn 60} 61 62func NewScheduler( 63 ctx context.Context, commitCallback commitHandler, 64) *parallel.Scheduler { 65 rsc := events.RepoStreamCallbacks{ 66 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 67 return commitCallback(ctx, evt) 68 }, 69 } 70 71 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 72} 73 74func main() { 75 sl := slog.With("source", "backfiller") 76 77 streamClosed := make(chan struct{}) 78 streamCtx, streamCancel := context.WithCancel(context.Background()) 79 80 stateDb := NewDatabase("state.db") 81 stateDb.AutoMigrate(&backfill.GormDBJob{}) 82 stateDb.AutoMigrate(&cursorRecord{}) 83 84 contentDb := NewDatabase("database.db") 85 contentDb.AutoMigrate(&FeedGenerator{}) 86 87 backend := NewBackend(stateDb, contentDb) 88 89 bf := NewBackfiller(stateDb, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp) 90 go bf.Start() 91 92 // attach the backfiller to the backend so the curors, the pump, 93 // and the commit callback can use it 94 backend.bf = bf 95 96 go backend.SyncCursors(streamCtx) 97 98 cursor, err := backend.LoadCursor("firehose") 99 if err != nil { 100 sl.Error("failed loading firehose cursor", "err", err) 101 } 102 conn := NewFirehose(streamCtx, cursor) 103 104 sched := NewScheduler(streamCtx, backend.RepoCommitHandler) 105 go func() { 106 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil { 107 sl.Error("failed to start scheduler", "err", err) 108 } 109 close(streamClosed) 110 }() 111 112 go func() { 113 if err := backend.PumpRepos(streamCtx); err != nil { 114 sl.Error("failed pumping repos", "err", err) 115 } 116 }() 117 118 quit := make(chan struct{}) 119 exitSignals := make(chan os.Signal, 1) 120 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) 121 go func() { 122 select { 123 case sig := <-exitSignals: 124 sl.Info("received OS exit signal", "signal", sig) 125 case <-streamClosed: 126 // 127 } 128 129 conn.Close() 130 131 streamCancel() 132 <-streamClosed 133 134 time.Sleep(time.Millisecond * 100) 135 136 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) 137 defer cancel() 138 bf.Stop(endctx) 139 140 close(quit) 141 }() 142 143 <-quit 144 145 sl.Info("flushing cursors") 146 if err := backend.FlushCursors(); err != nil { 147 sl.Error("failed to flush cursors on close", "err", err) 148 } 149 150 sl.Info("closing databases") 151 if err := backend.CleanlyClose(); err != nil { 152 sl.Error("failed to close databases", "err", err) 153 } 154}