this repo has no description

fix: backporessure for car reading

Changed files
+44 -10
+44 -10
backfiller.go
···
"fmt"
"io"
"net/http"
+
"runtime"
"strings"
"sync"
"time"
···
"go.uber.org/ratelimit"
)
+
type ProcessJob struct {
+
did string
+
repoBytes []byte
+
}
+
type RepoDownloader struct {
-
clients map[string]*http.Client
-
rateLimits map[string]ratelimit.Limiter
-
mu sync.RWMutex
-
p *Photocopy
+
clients map[string]*http.Client
+
rateLimits map[string]ratelimit.Limiter
+
processChan chan ProcessJob
+
mu sync.RWMutex
+
p *Photocopy
}
func NewRepoDownloader(p *Photocopy) *RepoDownloader {
return &RepoDownloader{
-
clients: make(map[string]*http.Client),
-
rateLimits: make(map[string]ratelimit.Limiter),
-
p: p,
+
clients: make(map[string]*http.Client),
+
rateLimits: make(map[string]ratelimit.Limiter),
+
p: p,
+
processChan: make(chan ProcessJob, 1000),
}
}
···
bs := atproto_repo.NewTinyBlockstore()
cs, err := car.NewCarReader(bytes.NewReader(b))
if err != nil {
+
fmt.Println("error opening car", err)
return fmt.Errorf("error opening car: %v\n", err)
}
···
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
if err != nil || r == nil {
+
fmt.Println("error opening repo", err)
fmt.Printf("could not open repo: %v", err)
return nil
}
···
Status string `json:"status"`
}
+
func (p *Photocopy) runProcessRepoWorker(ctx context.Context, jobs <-chan ProcessJob) {
+
for j := range jobs {
+
p.processRepo(ctx, j.repoBytes, j.did)
+
}
+
}
+
func (p *Photocopy) runBackfiller(ctx context.Context) error {
startTime := time.Now()
fmt.Println("querying clickhouse for dids and services...")
+
var alreadyFetched []string
+
if err := p.conn.Select(ctx, &alreadyFetched, "SELECT DISTINCT(did) FROM default.record WHERE created_at < '2025-07-01'"); err != nil {
+
return err
+
}
+
+
alreadyFetchedMap := map[string]bool{}
+
for _, d := range alreadyFetched {
+
alreadyFetchedMap[d] = true
+
}
+
+
fmt.Println("getting dids")
+
var hostsCursor string
var sevs []ListServicesResponseItem
for {
···
downloader := NewRepoDownloader(p)
serviceDids := map[string][]string{}
+
for range runtime.NumCPU() / 2 {
+
go p.runProcessRepoWorker(ctx, downloader.processChan)
+
}
+
wg := sync.WaitGroup{}
mplk := sync.Mutex{}
for s := range servicesDids {
···
}
dids := []string{}
for _, r := range repos {
+
if alreadyFetchedMap[r.Did] {
+
skipped++
+
continue
+
}
dids = append(dids, r.Did)
}
mplk.Lock()
···
}
go func(b []byte, did string) {
-
if err := p.processRepo(ctx, b, did); err != nil {
-
fmt.Printf("error processing backfill record: %v\n", err)
-
}
+
downloader.processChan <- ProcessJob{repoBytes: b, did: did}
}(b, did)
processed++