this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "strings" 11 "sync" 12 "time" 13 14 atproto_repo "github.com/bluesky-social/indigo/atproto/repo" 15 "github.com/bluesky-social/indigo/repo" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/ipfs/go-cid" 18 "github.com/ipld/go-car" 19 _ "github.com/joho/godotenv/autoload" 20 "go.uber.org/ratelimit" 21) 22 23type RepoDownloader struct { 24 clients map[string]*http.Client 25 rateLimits map[string]ratelimit.Limiter 26 mu sync.RWMutex 27 p *Photocopy 28} 29 30func NewRepoDownloader(p *Photocopy) *RepoDownloader { 31 return &RepoDownloader{ 32 clients: make(map[string]*http.Client), 33 rateLimits: make(map[string]ratelimit.Limiter), 34 p: p, 35 } 36} 37 38func (rd *RepoDownloader) getClient(service string) *http.Client { 39 rd.mu.RLock() 40 client, exists := rd.clients[service] 41 rd.mu.RUnlock() 42 43 if exists { 44 return client 45 } 46 47 rd.mu.Lock() 48 defer rd.mu.Unlock() 49 50 if client, exists := rd.clients[service]; exists { 51 return client 52 } 53 54 client = util.RobustHTTPClient() 55 client.Timeout = 45 * time.Second 56 rd.clients[service] = client 57 return client 58} 59 60func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter { 61 if !strings.HasSuffix(service, ".bsky.network") { 62 service = "third-party" 63 } 64 65 rd.mu.RLock() 66 limiter, exists := rd.rateLimits[service] 67 rd.mu.RUnlock() 68 69 if exists { 70 return limiter 71 } 72 73 rd.mu.Lock() 74 defer rd.mu.Unlock() 75 76 if limiter, exists := rd.rateLimits[service]; exists { 77 return limiter 78 } 79 80 // 3000 per five minutes 81 limiter = ratelimit.New(10) 82 rd.rateLimits[service] = limiter 83 return limiter 84} 85 86func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) { 87 dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did) 88 89 req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil) 90 if err != nil { 91 return nil, fmt.Errorf("failed to create request: %w", err) 92 } 93 94 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 95 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 96 } 97 98 client := rd.getClient(service) 99 100 resp, err := client.Do(req) 101 if err != nil { 102 return nil, fmt.Errorf("failed to download repo: %w", err) 103 } 104 defer resp.Body.Close() 105 106 if resp.StatusCode != http.StatusOK { 107 if resp.StatusCode == 400 { 108 return nil, nil 109 } 110 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 111 } 112 113 b, err := io.ReadAll(resp.Body) 114 if err != nil { 115 return nil, fmt.Errorf("could not read bytes from response: %w", err) 116 } 117 118 return b, nil 119} 120 121func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error { 122 bs := atproto_repo.NewTinyBlockstore() 123 cs, err := car.NewCarReader(bytes.NewReader(b)) 124 if err != nil { 125 return fmt.Errorf("error opening car: %v\n", err) 126 } 127 128 currBlock, _ := cs.Next() 129 for currBlock != nil { 130 bs.Put(context.TODO(), currBlock) 131 next, _ := cs.Next() 132 currBlock = next 133 } 134 135 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 136 if err != nil || r == nil { 137 fmt.Printf("could not open repo: %v", err) 138 return nil 139 } 140 141 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 142 pts := strings.Split(key, "/") 143 nsid := pts[0] 144 rkey := pts[1] 145 cidStr := cid.String() 146 b, err := bs.Get(context.TODO(), cid) 147 if err != nil { 148 return nil 149 } 150 if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil { 151 return err 152 } 153 return nil 154 }); err != nil { 155 return fmt.Errorf("erorr traversing records: %v", err) 156 } 157 158 return nil 159} 160 161type ListReposResponse struct { 162 Cursor string `json:"cursor"` 163 Repos []ListReposRepo `json:"repos"` 164} 165 166type ListReposRepo struct { 167 Did string `json:"did"` 168 Head string `json:"head"` 169 Rev string `json:"rev"` 170 Active bool `json:"active"` 171 Status *string `json:"status,omitempty"` 172} 173 174func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) { 175 var cursor string 176 var repos []ListReposRepo 177 if service == "https://atproto.brid.gy" { 178 return nil, nil 179 } 180 for { 181 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil) 182 if err != nil { 183 return nil, err 184 } 185 186 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 187 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 188 } 189 190 rl := rd.getRateLimiter(service) 191 rl.Take() 192 193 cli := rd.getClient(service) 194 resp, err := cli.Do(req) 195 if err != nil { 196 return nil, err 197 } 198 defer resp.Body.Close() 199 200 if resp.StatusCode != http.StatusOK { 201 return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 202 } 203 204 var reposResp ListReposResponse 205 if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil { 206 return nil, fmt.Errorf("error decoding repos response: %w", err) 207 } 208 209 for _, repo := range reposResp.Repos { 210 if repo.Status != nil { 211 if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" { 212 continue 213 } 214 } 215 216 repos = append(repos, repo) 217 } 218 219 if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" { 220 break 221 } 222 223 fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service) 224 225 cursor = reposResp.Cursor 226 } 227 228 return repos, nil 229} 230 231type ListServicesResponse struct { 232 Cursor string `json:"cursor"` 233 Hosts []ListServicesResponseItem `json:"hosts"` 234} 235 236type ListServicesResponseItem struct { 237 Hostname string `json:"hostname"` 238 Status string `json:"status"` 239} 240 241func (p *Photocopy) runBackfiller(ctx context.Context) error { 242 startTime := time.Now() 243 244 fmt.Println("querying clickhouse for dids and services...") 245 246 var hostsCursor string 247 var sevs []ListServicesResponseItem 248 for { 249 if hostsCursor != "" { 250 hostsCursor = "&cursor=" + hostsCursor 251 } 252 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil) 253 if err != nil { 254 return err 255 } 256 257 resp, err := http.DefaultClient.Do(req) 258 if err != nil { 259 return err 260 } 261 defer resp.Body.Close() 262 263 if resp.StatusCode != http.StatusOK { 264 return fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 265 } 266 267 var sevsResp ListServicesResponse 268 if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil { 269 return fmt.Errorf("error decoding sevs response: %w", err) 270 } 271 272 for _, sev := range sevsResp.Hosts { 273 if sev.Status != "active" { 274 continue 275 } 276 277 sevs = append(sevs, sev) 278 } 279 280 if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" { 281 break 282 } 283 284 hostsCursor = sevsResp.Cursor 285 } 286 287 servicesDids := map[string][]string{} 288 for _, sev := range sevs { 289 servicesDids["https://"+sev.Hostname] = []string{} 290 } 291 292 fmt.Printf("found %d services\n", len(servicesDids)) 293 294 fmt.Printf("collecting dids...\n") 295 296 fmt.Printf("building download buckets...") 297 298 skipped := 0 299 downloader := NewRepoDownloader(p) 300 serviceDids := map[string][]string{} 301 302 wg := sync.WaitGroup{} 303 mplk := sync.Mutex{} 304 for s := range servicesDids { 305 wg.Add(1) 306 go func() { 307 defer wg.Done() 308 repos, err := downloader.getDidsFromService(context.TODO(), s) 309 if err != nil { 310 fmt.Printf("error getting dids for services %s: %v", s, err) 311 return 312 } 313 dids := []string{} 314 for _, r := range repos { 315 dids = append(dids, r.Did) 316 } 317 mplk.Lock() 318 defer mplk.Unlock() 319 serviceDids[s] = dids 320 }() 321 } 322 323 fmt.Println("getting all the repos...") 324 wg.Wait() 325 326 fmt.Printf("was able to skip %d repos\n", skipped) 327 328 total := 0 329 330 for service, dids := range serviceDids { 331 if len(dids) < 100 { 332 continue 333 } 334 fmt.Printf("%s: %d jobs\n", service, len(dids)) 335 total += len(dids) 336 } 337 338 fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids)) 339 340 for _, c := range downloader.clients { 341 c.Timeout = 10 * time.Minute 342 } 343 344 for s := range downloader.rateLimits { 345 if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") { 346 downloader.rateLimits[s] = ratelimit.New(25) 347 } 348 } 349 350 processed := 0 351 errored := 0 352 var errors []error 353 for service, dids := range serviceDids { 354 go func() { 355 for _, did := range dids { 356 ratelimiter := downloader.getRateLimiter(service) 357 ratelimiter.Take() 358 359 b, err := downloader.downloadRepo(service, did) 360 if err != nil { 361 errored++ 362 processed++ 363 errors = append(errors, err) 364 continue 365 } 366 367 go func(b []byte, did string) { 368 if err := p.processRepo(ctx, b, did); err != nil { 369 fmt.Printf("error processing backfill record: %v\n", err) 370 } 371 }(b, did) 372 373 processed++ 374 } 375 }() 376 } 377 378 ticker := time.NewTicker(1 * time.Second) 379 defer ticker.Stop() 380 381 for range ticker.C { 382 elapsed := time.Since(startTime) 383 rate := float64(processed) / elapsed.Seconds() 384 remaining := total - processed 385 386 var eta string 387 if rate > 0 { 388 etaSeconds := float64(remaining) / rate 389 etaDuration := time.Duration(etaSeconds * float64(time.Second)) 390 eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second)) 391 } else { 392 eta = ", ETA: calculating..." 393 } 394 395 for _, err := range errors { 396 fmt.Printf("%v\n", err) 397 } 398 399 errors = nil 400 401 fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s", 402 processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta) 403 } 404 405 fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored) 406 407 return nil 408}