an app.bsky.* indexer
1package main
2
3import (
4 "context"
5 "net/http"
6 // "strings"
7 // "log/slog"
8 "log"
9 "os"
10 "os/signal"
11 "syscall"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/backfill"
15 "github.com/bluesky-social/indigo/events"
16 "github.com/bluesky-social/indigo/events/schedulers/parallel"
17
18 "github.com/gorilla/websocket"
19 "github.com/ipfs/go-cid"
20 "gorm.io/driver/sqlite"
21 "gorm.io/gorm"
22)
23
24func handleCreate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
25 log.Printf("handleCreate: at://%s/%s", repo, path)
26 return nil
27}
28
29func handleUpdate(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
30 return nil
31}
32
33func handleDelete(ctx context.Context, repo, rev, path string) error {
34 return nil
35}
36
37func main() {
38 ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
39 defer stop()
40
41 db, err := gorm.Open(sqlite.Open("backfiller.db"), &gorm.Config{})
42 if err != nil {
43 log.Fatalf("failed to connect database: %s", err)
44 }
45 db.AutoMigrate(&backfill.GormDBJob{})
46 store := backfill.NewGormstore(db)
47
48 // connect to the relay
49 d := websocket.DefaultDialer
50 con, _, err := d.Dial("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", http.Header{})
51 if err != nil {
52 log.Fatalf("failed to connect to relay: %s", err)
53 }
54
55 rsc := events.RepoStreamCallbacks{
56 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
57 // log.Printf("enqueuing: %s", evt.Repo)
58 return store.EnqueueJobWithState(ctx, evt.Repo, backfill.StateEnqueued)
59 },
60 }
61
62 // start backfilling
63 opts := &backfill.BackfillOptions{
64 ParallelBackfills: 10,
65 ParallelRecordCreates: 100,
66 NSIDFilter: "app.bsky.labeler.service",
67 SyncRequestsPerSecond: 10,
68 RelayHost: "https://bsky.network",
69 }
70 bf := backfill.NewBackfiller("backfiller-labels", store, handleCreate, handleUpdate, handleDelete, opts)
71 go bf.Start()
72
73 sched := parallel.NewScheduler(4, 1000, "backfiller-labels", rsc.EventHandler)
74 if err := events.HandleRepoStream(ctx, con, sched, nil); err != nil {
75 log.Fatalf("failed to start scheduler: %s", err)
76 }
77
78 <-ctx.Done()
79 bf.Stop(ctx)
80}