an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log" 6 "log/slog" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/backfill" 15 "github.com/bluesky-social/indigo/events" 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 18 "github.com/gorilla/websocket" 19 "gorm.io/driver/sqlite" 20 "gorm.io/gorm" 21 "gorm.io/gorm/logger" 22) 23 24func NewDatabase() *gorm.DB { 25 sl := slog.With("source", "database") 26 newLogger := logger.New( 27 log.New(os.Stdout, "\r\n", log.LstdFlags), 28 logger.Config{ 29 SlowThreshold: 1 * time.Second, 30 Colorful: false, 31 }, 32 ) 33 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ 34 Logger: newLogger, 35 }) 36 if err != nil { 37 sl.Error("failed to connect to database", "err", err) 38 } 39 db.AutoMigrate(&backfill.GormDBJob{}) 40 db.AutoMigrate(&cursorRecord{}) 41 42 return db 43} 44 45func NewBackfiller( 46 db *gorm.DB, create handleOpCreateUpdate, update handleOpCreateUpdate, delete handleOpDelete, 47) *backfill.Backfiller { 48 opts := &backfill.BackfillOptions{ 49 // ParallelBackfills: 50, 50 // ParallelRecordCreates: 25, 51 // SyncRequestsPerSecond: 25, 52 53 ParallelBackfills: 10, 54 ParallelRecordCreates: 1, // sqlite 55 SyncRequestsPerSecond: 5, 56 57 RelayHost: "https://bsky.network", 58 } 59 60 return backfill.NewBackfiller( 61 "backfills", 62 backfill.NewGormstore(db), 63 create, update, delete, 64 opts, 65 ) 66} 67 68func NewFirehose(ctx context.Context, cursor string) *websocket.Conn { 69 sl := slog.With("source", "firehose") 70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 71 if cursor != "" { 72 subscribeUrl += "?cursor=" + cursor 73 } 74 75 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{ 76 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 77 }) 78 if err != nil { 79 sl.Error("failed to connect to relay", "err", err) 80 } 81 82 return conn 83} 84 85func NewScheduler( 86 ctx context.Context, commitCallback commitHandler, 87) *parallel.Scheduler { 88 rsc := events.RepoStreamCallbacks{ 89 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 90 return commitCallback(ctx, evt) 91 }, 92 } 93 94 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 95} 96 97func main() { 98 sl := slog.With("source", "backfiller") 99 100 streamClosed := make(chan struct{}) 101 streamCtx, streamCancel := context.WithCancel(context.Background()) 102 103 db := NewDatabase() 104 backend := NewBackend(db) 105 106 bf := NewBackfiller(db, backend.HandleCreateOp, backend.HandleUpdateOp, backend.HandleDeleteOp) 107 go bf.Start() 108 109 // attach the backfiller to the backend so pump and repo commit handler can use it 110 backend.bf = bf 111 112 go backend.SyncCursors(streamCtx) 113 114 cursor, err := backend.LoadCursor("firehose") 115 if err != nil { 116 sl.Error("failed loading firehose cursor", "err", err) 117 } 118 conn := NewFirehose(streamCtx, cursor) 119 120 sched := NewScheduler(streamCtx, backend.RepoCommitHandler) 121 go func() { 122 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil { 123 sl.Error("failed to start scheduler", "err", err) 124 } 125 close(streamClosed) 126 }() 127 128 go func() { 129 if err := backend.PumpRepos(streamCtx); err != nil { 130 sl.Error("failed pumping repos", "err", err) 131 } 132 }() 133 134 quit := make(chan struct{}) 135 exitSignals := make(chan os.Signal, 1) 136 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) 137 go func() { 138 select { 139 case sig := <-exitSignals: 140 sl.Info("received OS exit signal", "signal", sig) 141 case <-streamClosed: 142 // 143 } 144 145 conn.Close() 146 147 streamCancel() 148 <-streamClosed 149 150 time.Sleep(time.Millisecond * 100) 151 152 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) 153 defer cancel() 154 bf.Stop(endctx) 155 156 if err := backend.FlushCursors(); err != nil { 157 sl.Error("final flush cursor failed", "err", err) 158 } 159 160 close(quit) 161 }() 162 163 <-quit 164 165 if err := backend.FlushCursors(); err != nil { 166 sl.Error("failed to flush cursors on close", "err", err) 167 } 168}