an app.bsky.* indexer
1package main 2 3import ( 4 "context" 5 "log" 6 "log/slog" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/backfill" 15 "github.com/bluesky-social/indigo/events" 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 18 "github.com/gorilla/websocket" 19 "gorm.io/driver/sqlite" 20 "gorm.io/gorm" 21 "gorm.io/gorm/logger" 22) 23 24func NewDatabase() *gorm.DB { 25 sl := slog.With("source", "database") 26 27 newLogger := logger.New( 28 log.New(os.Stdout, "\n", log.LstdFlags), 29 logger.Config{ 30 SlowThreshold: 1 * time.Second, 31 Colorful: false, 32 }, 33 ) 34 db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{ 35 Logger: newLogger, 36 }) 37 if err != nil { 38 sl.Error("failed to connect to database", "err", err) 39 } 40 db.AutoMigrate(&backfill.GormDBJob{}) 41 db.AutoMigrate(&cursorRecord{}) 42 return db 43} 44 45func NewBackfiller(db *gorm.DB) *backfill.Backfiller { 46 opts := &backfill.BackfillOptions{ 47 // ParallelBackfills: 50, 48 // ParallelRecordCreates: 25, 49 // SyncRequestsPerSecond: 25, 50 51 ParallelBackfills: 10, 52 ParallelRecordCreates: 5, 53 SyncRequestsPerSecond: 5, 54 55 RelayHost: "https://bsky.network", 56 } 57 return backfill.NewBackfiller( 58 "backfills", 59 backfill.NewGormstore(db), 60 handleCreate, 61 handleUpdate, 62 handleDelete, 63 opts, 64 ) 65} 66 67func NewFirehose(backend *Backend) *websocket.Conn { 68 sl := slog.With("source", "firehose") 69 70 subscribeUrl := "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 71 72 cursor, err := backend.LoadCursor("firehose") 73 if err != nil { 74 sl.Error("failed loading firehose cursor", "err", err) 75 } 76 if cursor != "" { 77 subscribeUrl += "?cursor=" + cursor 78 } 79 80 conn, _, err := websocket.DefaultDialer.Dial(subscribeUrl, http.Header{ 81 "User-Agent": []string{"backfiller/0.1 (@edavis.dev)"}, 82 }) 83 if err != nil { 84 sl.Error("failed to connect to relay", "err", err) 85 } 86 return conn 87} 88 89func NewScheduler(ctx context.Context, backend *Backend) *parallel.Scheduler { 90 rsc := events.RepoStreamCallbacks{ 91 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 92 return backend.RepoCommitHandler(ctx, evt) 93 }, 94 } 95 return parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler) 96} 97 98func main() { 99 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 100 defer stop() 101 102 sl := slog.With("source", "backfiller") 103 104 db := NewDatabase() 105 106 bf := NewBackfiller(db) 107 go bf.Start() 108 109 backend := NewBackend(db, bf) 110 go backend.SyncCursors() 111 go backend.PumpRepos(ctx) 112 113 conn := NewFirehose(backend) 114 sched := NewScheduler(ctx, backend) 115 if err := events.HandleRepoStream(ctx, conn, sched, sl); err != nil { 116 sl.Error("failed to start scheduler", "err", err) 117 } 118 <-ctx.Done() 119 bf.Stop(context.TODO()) 120}