1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "strings"
11 "sync"
12 "time"
13
14 atproto_repo "github.com/bluesky-social/indigo/atproto/repo"
15 "github.com/bluesky-social/indigo/repo"
16 "github.com/bluesky-social/indigo/util"
17 "github.com/ipfs/go-cid"
18 "github.com/ipld/go-car"
19 _ "github.com/joho/godotenv/autoload"
20 "go.uber.org/ratelimit"
21)
22
23type RepoDownloader struct {
24 clients map[string]*http.Client
25 rateLimits map[string]ratelimit.Limiter
26 mu sync.RWMutex
27 p *Photocopy
28}
29
30func NewRepoDownloader(p *Photocopy) *RepoDownloader {
31 return &RepoDownloader{
32 clients: make(map[string]*http.Client),
33 rateLimits: make(map[string]ratelimit.Limiter),
34 p: p,
35 }
36}
37
38func (rd *RepoDownloader) getClient(service string) *http.Client {
39 rd.mu.RLock()
40 client, exists := rd.clients[service]
41 rd.mu.RUnlock()
42
43 if exists {
44 return client
45 }
46
47 rd.mu.Lock()
48 defer rd.mu.Unlock()
49
50 if client, exists := rd.clients[service]; exists {
51 return client
52 }
53
54 client = util.RobustHTTPClient()
55 client.Timeout = 45 * time.Second
56 rd.clients[service] = client
57 return client
58}
59
60func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter {
61 if !strings.HasSuffix(service, ".bsky.network") {
62 service = "third-party"
63 }
64
65 rd.mu.RLock()
66 limiter, exists := rd.rateLimits[service]
67 rd.mu.RUnlock()
68
69 if exists {
70 return limiter
71 }
72
73 rd.mu.Lock()
74 defer rd.mu.Unlock()
75
76 if limiter, exists := rd.rateLimits[service]; exists {
77 return limiter
78 }
79
80 // 3000 per five minutes
81 limiter = ratelimit.New(10)
82 rd.rateLimits[service] = limiter
83 return limiter
84}
85
86func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) {
87 dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did)
88
89 req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil)
90 if err != nil {
91 return nil, fmt.Errorf("failed to create request: %w", err)
92 }
93
94 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
95 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
96 }
97
98 client := rd.getClient(service)
99
100 resp, err := client.Do(req)
101 if err != nil {
102 return nil, fmt.Errorf("failed to download repo: %w", err)
103 }
104 defer resp.Body.Close()
105
106 if resp.StatusCode != http.StatusOK {
107 if resp.StatusCode == 400 {
108 return nil, nil
109 }
110 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
111 }
112
113 b, err := io.ReadAll(resp.Body)
114 if err != nil {
115 return nil, fmt.Errorf("could not read bytes from response: %w", err)
116 }
117
118 return b, nil
119}
120
121func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error {
122 bs := atproto_repo.NewTinyBlockstore()
123 cs, err := car.NewCarReader(bytes.NewReader(b))
124 if err != nil {
125 return fmt.Errorf("error opening car: %v\n", err)
126 }
127
128 currBlock, _ := cs.Next()
129 for currBlock != nil {
130 bs.Put(context.TODO(), currBlock)
131 next, _ := cs.Next()
132 currBlock = next
133 }
134
135 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
136 if err != nil || r == nil {
137 fmt.Printf("could not open repo: %v", err)
138 return nil
139 }
140
141 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
142 pts := strings.Split(key, "/")
143 nsid := pts[0]
144 rkey := pts[1]
145 cidStr := cid.String()
146 b, err := bs.Get(context.TODO(), cid)
147 if err != nil {
148 return nil
149 }
150 if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil {
151 return err
152 }
153 return nil
154 }); err != nil {
155 return fmt.Errorf("erorr traversing records: %v", err)
156 }
157
158 return nil
159}
160
161type ListReposResponse struct {
162 Cursor string `json:"cursor"`
163 Repos []ListReposRepo `json:"repos"`
164}
165
166type ListReposRepo struct {
167 Did string `json:"did"`
168 Head string `json:"head"`
169 Rev string `json:"rev"`
170 Active bool `json:"active"`
171 Status *string `json:"status,omitempty"`
172}
173
174func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) {
175 var cursor string
176 var repos []ListReposRepo
177 if service == "https://atproto.brid.gy" {
178 return nil, nil
179 }
180 for {
181 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil)
182 if err != nil {
183 return nil, err
184 }
185
186 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
187 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
188 }
189
190 rl := rd.getRateLimiter(service)
191 rl.Take()
192
193 cli := rd.getClient(service)
194 resp, err := cli.Do(req)
195 if err != nil {
196 return nil, err
197 }
198 defer resp.Body.Close()
199
200 if resp.StatusCode != http.StatusOK {
201 return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
202 }
203
204 var reposResp ListReposResponse
205 if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil {
206 return nil, fmt.Errorf("error decoding repos response: %w", err)
207 }
208
209 for _, repo := range reposResp.Repos {
210 if repo.Status != nil {
211 if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" {
212 continue
213 }
214 }
215
216 repos = append(repos, repo)
217 }
218
219 if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" {
220 break
221 }
222
223 fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service)
224
225 cursor = reposResp.Cursor
226 }
227
228 return repos, nil
229}
230
231type ListServicesResponse struct {
232 Cursor string `json:"cursor"`
233 Hosts []ListServicesResponseItem `json:"hosts"`
234}
235
236type ListServicesResponseItem struct {
237 Hostname string `json:"hostname"`
238 Status string `json:"status"`
239}
240
241func (p *Photocopy) runBackfiller(ctx context.Context) error {
242 startTime := time.Now()
243
244 fmt.Println("querying clickhouse for dids and services...")
245
246 var hostsCursor string
247 var sevs []ListServicesResponseItem
248 for {
249 if hostsCursor != "" {
250 hostsCursor = "&cursor=" + hostsCursor
251 }
252 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil)
253 if err != nil {
254 return err
255 }
256
257 resp, err := http.DefaultClient.Do(req)
258 if err != nil {
259 return err
260 }
261 defer resp.Body.Close()
262
263 if resp.StatusCode != http.StatusOK {
264 return fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
265 }
266
267 var sevsResp ListServicesResponse
268 if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil {
269 return fmt.Errorf("error decoding sevs response: %w", err)
270 }
271
272 for _, sev := range sevsResp.Hosts {
273 if sev.Status != "active" {
274 continue
275 }
276
277 sevs = append(sevs, sev)
278 }
279
280 if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" {
281 break
282 }
283
284 hostsCursor = sevsResp.Cursor
285 }
286
287 servicesDids := map[string][]string{}
288 for _, sev := range sevs {
289 servicesDids["https://"+sev.Hostname] = []string{}
290 }
291
292 fmt.Printf("found %d services\n", len(servicesDids))
293
294 fmt.Printf("collecting dids...\n")
295
296 fmt.Printf("building download buckets...")
297
298 skipped := 0
299 downloader := NewRepoDownloader(p)
300 serviceDids := map[string][]string{}
301
302 wg := sync.WaitGroup{}
303 mplk := sync.Mutex{}
304 for s := range servicesDids {
305 wg.Add(1)
306 go func() {
307 defer wg.Done()
308 repos, err := downloader.getDidsFromService(context.TODO(), s)
309 if err != nil {
310 fmt.Printf("error getting dids for services %s: %v", s, err)
311 return
312 }
313 dids := []string{}
314 for _, r := range repos {
315 dids = append(dids, r.Did)
316 }
317 mplk.Lock()
318 defer mplk.Unlock()
319 serviceDids[s] = dids
320 }()
321 }
322
323 fmt.Println("getting all the repos...")
324 wg.Wait()
325
326 fmt.Printf("was able to skip %d repos\n", skipped)
327
328 total := 0
329
330 for service, dids := range serviceDids {
331 if len(dids) < 100 {
332 continue
333 }
334 fmt.Printf("%s: %d jobs\n", service, len(dids))
335 total += len(dids)
336 }
337
338 fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids))
339
340 for _, c := range downloader.clients {
341 c.Timeout = 10 * time.Minute
342 }
343
344 for s := range downloader.rateLimits {
345 if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") {
346 downloader.rateLimits[s] = ratelimit.New(25)
347 }
348 }
349
350 processed := 0
351 errored := 0
352 var errors []error
353 for service, dids := range serviceDids {
354 go func() {
355 for _, did := range dids {
356 ratelimiter := downloader.getRateLimiter(service)
357 ratelimiter.Take()
358
359 b, err := downloader.downloadRepo(service, did)
360 if err != nil {
361 errored++
362 processed++
363 errors = append(errors, err)
364 continue
365 }
366
367 go func(b []byte, did string) {
368 if err := p.processRepo(ctx, b, did); err != nil {
369 fmt.Printf("error processing backfill record: %v\n", err)
370 }
371 }(b, did)
372
373 processed++
374 }
375 }()
376 }
377
378 ticker := time.NewTicker(1 * time.Second)
379 defer ticker.Stop()
380
381 for range ticker.C {
382 elapsed := time.Since(startTime)
383 rate := float64(processed) / elapsed.Seconds()
384 remaining := total - processed
385
386 var eta string
387 if rate > 0 {
388 etaSeconds := float64(remaining) / rate
389 etaDuration := time.Duration(etaSeconds * float64(time.Second))
390 eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second))
391 } else {
392 eta = ", ETA: calculating..."
393 }
394
395 for _, err := range errors {
396 fmt.Printf("%v\n", err)
397 }
398
399 errors = nil
400
401 fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s",
402 processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta)
403 }
404
405 fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored)
406
407 return nil
408}