an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log" 6 "log/slog" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/backfill" 15 "github.com/bluesky-social/indigo/events" 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 18 "github.com/gorilla/websocket" 19 "gorm.io/driver/sqlite" 20 "gorm.io/gorm" 21 "gorm.io/gorm/logger" 22) 23 24func NewDatabase() *gorm.DB { 25 sl := slog.With("source", "database") 26 newLogger := logger.New( 27 log.New(os.Stdout, "\r\n", log.LstdFlags), 28 logger.Config{ 29 SlowThreshold: 1 * time.Second, 30 Colorful: false, 31 }, 32 ) 33 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ 34 Logger: newLogger, 35 }) 36 if err != nil { 37 sl.Error("failed to connect to database", "err", err) 38 } 39 db.AutoMigrate(&backfill.GormDBJob{}) 40 db.AutoMigrate(&cursorRecord{}) 41 42 return db 43} 44 45func NewBackfiller(db *gorm.DB) *backfill.Backfiller { 46 opts := &backfill.BackfillOptions{ 47 // ParallelBackfills: 50, 48 // ParallelRecordCreates: 25, 49 // SyncRequestsPerSecond: 25, 50 51 ParallelBackfills: 10, 52 ParallelRecordCreates: 1, // sqlite 53 SyncRequestsPerSecond: 5, 54 55 RelayHost: "https://bsky.network", 56 } 57 58 return backfill.NewBackfiller( 59 "backfills", 60 backfill.NewGormstore(db), 61 handleCreate, 62 handleUpdate, 63 handleDelete, 64 opts, 65 ) 66} 67 68func NewFirehose(ctx context.Context, cursor string) *websocket.Conn { 69 sl := slog.With("source", "firehose") 70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 71 if cursor != "" { 72 subscribeUrl += "?cursor=" + cursor 73 } 74 75 conn, _, err := websocket.DefaultDialer.DialContext(ctx, subscribeUrl, http.Header{ 76 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 77 }) 78 if err != nil { 79 sl.Error("failed to connect to relay", "err", err) 80 } 81 82 return conn 83} 84 85func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler { 86 rsc := events.RepoStreamCallbacks{ 87 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 88 return backend.RepoCommitHandler(ctx, evt) 89 }, 90 } 91 92 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 93} 94 95func main() { 96 sl := slog.With("source", "backfiller") 97 98 streamClosed := make(chan struct{}) 99 streamCtx, streamCancel := context.WithCancel(context.Background()) 100 101 db := NewDatabase() 102 103 bf := NewBackfiller(db) 104 go bf.Start() 105 106 backend := NewBackend(db, bf) 107 go backend.SyncCursors(streamCtx) 108 109 cursor, err := backend.LoadCursor("firehose") 110 if err != nil { 111 sl.Error("failed loading firehose cursor", "err", err) 112 } 113 conn := NewFirehose(streamCtx, cursor) 114 115 sched := NewScheduler(streamCtx, backend) 116 go func() { 117 if err := events.HandleRepoStream(streamCtx, conn, sched, sl); err != nil { 118 sl.Error("failed to start scheduler", "err", err) 119 } 120 close(streamClosed) 121 }() 122 123 go func() { 124 if err := backend.PumpRepos(streamCtx); err != nil { 125 sl.Error("failed pumping repos", "err", err) 126 } else { 127 sl.Info("finished listing repos, switching over to event stream") 128 } 129 }() 130 131 quit := make(chan struct{}) 132 exitSignals := make(chan os.Signal, 1) 133 signal.Notify(exitSignals, syscall.SIGINT, syscall.SIGTERM) 134 go func() { 135 select { 136 case sig := <-exitSignals: 137 sl.Info("received OS exit signal", "signal", sig) 138 case <-streamClosed: 139 // 140 } 141 142 conn.Close() 143 144 streamCancel() 145 <-streamClosed 146 147 time.Sleep(time.Millisecond * 100) 148 149 endctx, cancel := context.WithTimeout(context.TODO(), time.Minute) 150 defer cancel() 151 bf.Stop(endctx) 152 153 if err := backend.FlushCursors(); err != nil { 154 sl.Error("final flush cursor failed", "err", err) 155 } 156 157 close(quit) 158 }() 159 160 <-quit 161 162 if err := backend.FlushCursors(); err != nil { 163 sl.Error("failed to flush cursors on close", "err", err) 164 } 165}