1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "runtime"
11 "strings"
12 "sync"
13 "time"
14
15 atproto_repo "github.com/bluesky-social/indigo/atproto/repo"
16 "github.com/bluesky-social/indigo/repo"
17 "github.com/bluesky-social/indigo/util"
18 "github.com/ipfs/go-cid"
19 "github.com/ipld/go-car"
20 _ "github.com/joho/godotenv/autoload"
21 "go.uber.org/ratelimit"
22)
23
24type ProcessJob struct {
25 did string
26 repoBytes []byte
27}
28
29type RepoDownloader struct {
30 clients map[string]*http.Client
31 rateLimits map[string]ratelimit.Limiter
32 processChan chan ProcessJob
33 mu sync.RWMutex
34 p *Photocopy
35}
36
37func NewRepoDownloader(p *Photocopy) *RepoDownloader {
38 return &RepoDownloader{
39 clients: make(map[string]*http.Client),
40 rateLimits: make(map[string]ratelimit.Limiter),
41 p: p,
42 processChan: make(chan ProcessJob, 1000),
43 }
44}
45
46func (rd *RepoDownloader) getClient(service string) *http.Client {
47 rd.mu.RLock()
48 client, exists := rd.clients[service]
49 rd.mu.RUnlock()
50
51 if exists {
52 return client
53 }
54
55 rd.mu.Lock()
56 defer rd.mu.Unlock()
57
58 if client, exists := rd.clients[service]; exists {
59 return client
60 }
61
62 client = util.RobustHTTPClient()
63 client.Timeout = 45 * time.Second
64 rd.clients[service] = client
65 return client
66}
67
68func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter {
69 if !strings.HasSuffix(service, ".bsky.network") {
70 service = "third-party"
71 }
72
73 rd.mu.RLock()
74 limiter, exists := rd.rateLimits[service]
75 rd.mu.RUnlock()
76
77 if exists {
78 return limiter
79 }
80
81 rd.mu.Lock()
82 defer rd.mu.Unlock()
83
84 if limiter, exists := rd.rateLimits[service]; exists {
85 return limiter
86 }
87
88 // 3000 per five minutes
89 limiter = ratelimit.New(10)
90 rd.rateLimits[service] = limiter
91 return limiter
92}
93
94func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) {
95 dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did)
96
97 req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil)
98 if err != nil {
99 return nil, fmt.Errorf("failed to create request: %w", err)
100 }
101
102 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
103 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
104 }
105
106 client := rd.getClient(service)
107
108 resp, err := client.Do(req)
109 if err != nil {
110 return nil, fmt.Errorf("failed to download repo: %w", err)
111 }
112 defer resp.Body.Close()
113
114 if resp.StatusCode != http.StatusOK {
115 if resp.StatusCode == 400 {
116 return nil, nil
117 }
118 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
119 }
120
121 b, err := io.ReadAll(resp.Body)
122 if err != nil {
123 return nil, fmt.Errorf("could not read bytes from response: %w", err)
124 }
125
126 return b, nil
127}
128
129func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error {
130 bs := atproto_repo.NewTinyBlockstore()
131 cs, err := car.NewCarReader(bytes.NewReader(b))
132 if err != nil {
133 fmt.Println("error opening car", err)
134 return fmt.Errorf("error opening car: %v\n", err)
135 }
136
137 currBlock, _ := cs.Next()
138 for currBlock != nil {
139 bs.Put(context.TODO(), currBlock)
140 next, _ := cs.Next()
141 currBlock = next
142 }
143
144 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
145 if err != nil || r == nil {
146 fmt.Println("error opening repo", err)
147 fmt.Printf("could not open repo: %v", err)
148 return nil
149 }
150
151 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
152 pts := strings.Split(key, "/")
153 nsid := pts[0]
154 rkey := pts[1]
155 cidStr := cid.String()
156 b, err := bs.Get(context.TODO(), cid)
157 if err != nil {
158 return nil
159 }
160 if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil {
161 return err
162 }
163 return nil
164 }); err != nil {
165 return fmt.Errorf("erorr traversing records: %v", err)
166 }
167
168 return nil
169}
170
171type ListReposResponse struct {
172 Cursor string `json:"cursor"`
173 Repos []ListReposRepo `json:"repos"`
174}
175
176type ListReposRepo struct {
177 Did string `json:"did"`
178 Head string `json:"head"`
179 Rev string `json:"rev"`
180 Active bool `json:"active"`
181 Status *string `json:"status,omitempty"`
182}
183
184func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) {
185 var cursor string
186 var repos []ListReposRepo
187 if service == "https://atproto.brid.gy" {
188 return nil, nil
189 }
190 for {
191 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil)
192 if err != nil {
193 return nil, err
194 }
195
196 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
197 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
198 }
199
200 rl := rd.getRateLimiter(service)
201 rl.Take()
202
203 cli := rd.getClient(service)
204 resp, err := cli.Do(req)
205 if err != nil {
206 return nil, err
207 }
208 defer resp.Body.Close()
209
210 if resp.StatusCode != http.StatusOK {
211 return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
212 }
213
214 var reposResp ListReposResponse
215 if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil {
216 return nil, fmt.Errorf("error decoding repos response: %w", err)
217 }
218
219 for _, repo := range reposResp.Repos {
220 if repo.Status != nil {
221 if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" {
222 continue
223 }
224 }
225
226 repos = append(repos, repo)
227 }
228
229 if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" {
230 break
231 }
232
233 fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service)
234
235 cursor = reposResp.Cursor
236 }
237
238 return repos, nil
239}
240
241type ListServicesResponse struct {
242 Cursor string `json:"cursor"`
243 Hosts []ListServicesResponseItem `json:"hosts"`
244}
245
246type ListServicesResponseItem struct {
247 Hostname string `json:"hostname"`
248 Status string `json:"status"`
249}
250
251func (p *Photocopy) runProcessRepoWorker(ctx context.Context, jobs <-chan ProcessJob) {
252 for j := range jobs {
253 p.processRepo(ctx, j.repoBytes, j.did)
254 }
255}
256
257func (p *Photocopy) runBackfiller(ctx context.Context) error {
258 startTime := time.Now()
259
260 fmt.Println("querying clickhouse for dids and services...")
261
262 type alreadyFetchedItem struct {
263 Did string `ch:"did"`
264 }
265 var alreadyFetched []alreadyFetchedItem
266 if err := p.conn.Select(ctx, &alreadyFetched, "SELECT DISTINCT(did) FROM default.record WHERE created_at < '2025-07-01'"); err != nil {
267 return err
268 }
269
270 alreadyFetchedMap := map[string]bool{}
271 for _, d := range alreadyFetched {
272 alreadyFetchedMap[d.Did] = true
273 }
274
275 fmt.Println("getting dids")
276
277 var hostsCursor string
278 var sevs []ListServicesResponseItem
279 for {
280 if hostsCursor != "" {
281 hostsCursor = "&cursor=" + hostsCursor
282 }
283 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil)
284 if err != nil {
285 return err
286 }
287
288 resp, err := http.DefaultClient.Do(req)
289 if err != nil {
290 return err
291 }
292 defer resp.Body.Close()
293
294 if resp.StatusCode != http.StatusOK {
295 return fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
296 }
297
298 var sevsResp ListServicesResponse
299 if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil {
300 return fmt.Errorf("error decoding sevs response: %w", err)
301 }
302
303 for _, sev := range sevsResp.Hosts {
304 if sev.Status != "active" {
305 continue
306 }
307
308 sevs = append(sevs, sev)
309 }
310
311 if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" {
312 break
313 }
314
315 hostsCursor = sevsResp.Cursor
316 }
317
318 servicesDids := map[string][]string{}
319 for _, sev := range sevs {
320 servicesDids["https://"+sev.Hostname] = []string{}
321 }
322
323 fmt.Printf("found %d services\n", len(servicesDids))
324
325 fmt.Printf("collecting dids...\n")
326
327 fmt.Printf("building download buckets...")
328
329 skipped := 0
330 downloader := NewRepoDownloader(p)
331 serviceDids := map[string][]string{}
332
333 for range runtime.NumCPU() / 2 {
334 go p.runProcessRepoWorker(ctx, downloader.processChan)
335 }
336
337 wg := sync.WaitGroup{}
338 mplk := sync.Mutex{}
339 for s := range servicesDids {
340 wg.Add(1)
341 go func() {
342 defer wg.Done()
343 repos, err := downloader.getDidsFromService(context.TODO(), s)
344 if err != nil {
345 fmt.Printf("error getting dids for services %s: %v", s, err)
346 return
347 }
348 dids := []string{}
349 for _, r := range repos {
350 if alreadyFetchedMap[r.Did] {
351 skipped++
352 continue
353 }
354 dids = append(dids, r.Did)
355 }
356 mplk.Lock()
357 defer mplk.Unlock()
358 serviceDids[s] = dids
359 }()
360 }
361
362 fmt.Println("getting all the repos...")
363 wg.Wait()
364
365 fmt.Printf("was able to skip %d repos\n", skipped)
366
367 total := 0
368
369 for service, dids := range serviceDids {
370 if len(dids) < 100 {
371 continue
372 }
373 fmt.Printf("%s: %d jobs\n", service, len(dids))
374 total += len(dids)
375 }
376
377 fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids))
378
379 for _, c := range downloader.clients {
380 c.Timeout = 10 * time.Minute
381 }
382
383 for s := range downloader.rateLimits {
384 if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") {
385 downloader.rateLimits[s] = ratelimit.New(25)
386 }
387 }
388
389 processed := 0
390 errored := 0
391 var errors []error
392 for service, dids := range serviceDids {
393 go func() {
394 for _, did := range dids {
395 ratelimiter := downloader.getRateLimiter(service)
396 ratelimiter.Take()
397
398 b, err := downloader.downloadRepo(service, did)
399 if err != nil {
400 errored++
401 processed++
402 errors = append(errors, err)
403 continue
404 }
405
406 go func(b []byte, did string) {
407 downloader.processChan <- ProcessJob{repoBytes: b, did: did}
408 }(b, did)
409
410 processed++
411 }
412 }()
413 }
414
415 ticker := time.NewTicker(1 * time.Second)
416 defer ticker.Stop()
417
418 for range ticker.C {
419 elapsed := time.Since(startTime)
420 rate := float64(processed) / elapsed.Seconds()
421 remaining := total - processed
422
423 var eta string
424 if rate > 0 {
425 etaSeconds := float64(remaining) / rate
426 etaDuration := time.Duration(etaSeconds * float64(time.Second))
427 eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second))
428 } else {
429 eta = ", ETA: calculating..."
430 }
431
432 for _, err := range errors {
433 fmt.Printf("%v\n", err)
434 }
435
436 errors = nil
437
438 fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s",
439 processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta)
440 }
441
442 fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored)
443
444 return nil
445}