···
···
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
45
+
sl := slog.With("source", "backfiller")
log.New(os.Stdout, "\n", log.LstdFlags),
···
db, err := gorm.Open(sqlite.Open("state.db"), &gorm.Config{
56
-
log.Fatalf("failed to connect database: %s", err)
58
+
sl.Error("failed to connect to database", "err", err)
db.AutoMigrate(&backfill.GormDBJob{})
store := backfill.NewGormstore(db)
···
"User-Agent": []string{"backfiller/0.1 (@edavis.dev)"},
66
-
log.Fatalf("failed to connect to relay: %s", err)
68
+
sl.Error("failed to connect to relay", "err", err)
···
go func(bf *backfill.Backfiller) {
if err := pumpRepos(context.TODO(), bf); err != nil {
82
-
log.Printf("failed pumping repos: %s", err)
84
+
sl.Error("failed pumping repos", "err", err)
···
sched := parallel.NewScheduler(16, 100, "firehose", rsc.EventHandler)
if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
103
-
log.Fatalf("failed to start scheduler: %s", err)
105
+
sl.Error("failed to start scheduler", "err", err)
···
func pumpRepos(ctx context.Context, bf *backfill.Backfiller) error {
117
+
sl := slog.With("source", "pumpRepos")
Host: "https://bsky.network",
···
126
-
log.Printf("listing repos with cursor = %v", curs)
130
+
sl.Info("listing repos", "cursor", curs)
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
return fmt.Errorf("error listing repos: %w", err)
···
for _, repo := range res.Repos {
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
135
-
log.Printf("failed to create backfill job: %s", err)
139
+
sl.Warn("failed to create backfill job", "err", err)