this repo has no description
at main 10 kB view raw
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "runtime" 11 "strings" 12 "sync" 13 "time" 14 15 atproto_repo "github.com/bluesky-social/indigo/atproto/repo" 16 "github.com/bluesky-social/indigo/repo" 17 "github.com/bluesky-social/indigo/util" 18 "github.com/ipfs/go-cid" 19 "github.com/ipld/go-car" 20 _ "github.com/joho/godotenv/autoload" 21 "go.uber.org/ratelimit" 22) 23 24type ProcessJob struct { 25 did string 26 repoBytes []byte 27} 28 29type RepoDownloader struct { 30 clients map[string]*http.Client 31 rateLimits map[string]ratelimit.Limiter 32 processChan chan ProcessJob 33 mu sync.RWMutex 34 p *Photocopy 35} 36 37func NewRepoDownloader(p *Photocopy) *RepoDownloader { 38 return &RepoDownloader{ 39 clients: make(map[string]*http.Client), 40 rateLimits: make(map[string]ratelimit.Limiter), 41 p: p, 42 processChan: make(chan ProcessJob, 1000), 43 } 44} 45 46func (rd *RepoDownloader) getClient(service string) *http.Client { 47 rd.mu.RLock() 48 client, exists := rd.clients[service] 49 rd.mu.RUnlock() 50 51 if exists { 52 return client 53 } 54 55 rd.mu.Lock() 56 defer rd.mu.Unlock() 57 58 if client, exists := rd.clients[service]; exists { 59 return client 60 } 61 62 client = util.RobustHTTPClient() 63 client.Timeout = 45 * time.Second 64 rd.clients[service] = client 65 return client 66} 67 68func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter { 69 if !strings.HasSuffix(service, ".bsky.network") { 70 service = "third-party" 71 } 72 73 rd.mu.RLock() 74 limiter, exists := rd.rateLimits[service] 75 rd.mu.RUnlock() 76 77 if exists { 78 return limiter 79 } 80 81 rd.mu.Lock() 82 defer rd.mu.Unlock() 83 84 if limiter, exists := rd.rateLimits[service]; exists { 85 return limiter 86 } 87 88 // 3000 per five minutes 89 limiter = ratelimit.New(10) 90 rd.rateLimits[service] = limiter 91 return limiter 92} 93 94func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) { 95 dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did) 96 97 req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil) 98 if err != nil { 99 return nil, fmt.Errorf("failed to create request: %w", err) 100 } 101 102 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 103 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 104 } 105 106 client := rd.getClient(service) 107 108 resp, err := client.Do(req) 109 if err != nil { 110 return nil, fmt.Errorf("failed to download repo: %w", err) 111 } 112 defer resp.Body.Close() 113 114 if resp.StatusCode != http.StatusOK { 115 if resp.StatusCode == 400 { 116 return nil, nil 117 } 118 return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 119 } 120 121 b, err := io.ReadAll(resp.Body) 122 if err != nil { 123 return nil, fmt.Errorf("could not read bytes from response: %w", err) 124 } 125 126 return b, nil 127} 128 129func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error { 130 bs := atproto_repo.NewTinyBlockstore() 131 cs, err := car.NewCarReader(bytes.NewReader(b)) 132 if err != nil { 133 fmt.Println("error opening car", err) 134 return fmt.Errorf("error opening car: %v\n", err) 135 } 136 137 currBlock, _ := cs.Next() 138 for currBlock != nil { 139 bs.Put(context.TODO(), currBlock) 140 next, _ := cs.Next() 141 currBlock = next 142 } 143 144 r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 145 if err != nil || r == nil { 146 fmt.Println("error opening repo", err) 147 fmt.Printf("could not open repo: %v", err) 148 return nil 149 } 150 151 if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 152 pts := strings.Split(key, "/") 153 nsid := pts[0] 154 rkey := pts[1] 155 cidStr := cid.String() 156 b, err := bs.Get(context.TODO(), cid) 157 if err != nil { 158 return nil 159 } 160 if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil { 161 return err 162 } 163 return nil 164 }); err != nil { 165 return fmt.Errorf("erorr traversing records: %v", err) 166 } 167 168 return nil 169} 170 171type ListReposResponse struct { 172 Cursor string `json:"cursor"` 173 Repos []ListReposRepo `json:"repos"` 174} 175 176type ListReposRepo struct { 177 Did string `json:"did"` 178 Head string `json:"head"` 179 Rev string `json:"rev"` 180 Active bool `json:"active"` 181 Status *string `json:"status,omitempty"` 182} 183 184func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) { 185 var cursor string 186 var repos []ListReposRepo 187 if service == "https://atproto.brid.gy" { 188 return nil, nil 189 } 190 for { 191 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil) 192 if err != nil { 193 return nil, err 194 } 195 196 if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 197 req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 198 } 199 200 rl := rd.getRateLimiter(service) 201 rl.Take() 202 203 cli := rd.getClient(service) 204 resp, err := cli.Do(req) 205 if err != nil { 206 return nil, err 207 } 208 defer resp.Body.Close() 209 210 if resp.StatusCode != http.StatusOK { 211 return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 212 } 213 214 var reposResp ListReposResponse 215 if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil { 216 return nil, fmt.Errorf("error decoding repos response: %w", err) 217 } 218 219 for _, repo := range reposResp.Repos { 220 if repo.Status != nil { 221 if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" { 222 continue 223 } 224 } 225 226 repos = append(repos, repo) 227 } 228 229 if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" { 230 break 231 } 232 233 fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service) 234 235 cursor = reposResp.Cursor 236 } 237 238 return repos, nil 239} 240 241type ListServicesResponse struct { 242 Cursor string `json:"cursor"` 243 Hosts []ListServicesResponseItem `json:"hosts"` 244} 245 246type ListServicesResponseItem struct { 247 Hostname string `json:"hostname"` 248 Status string `json:"status"` 249} 250 251func (p *Photocopy) runProcessRepoWorker(ctx context.Context, jobs <-chan ProcessJob) { 252 for j := range jobs { 253 p.processRepo(ctx, j.repoBytes, j.did) 254 } 255} 256 257func (p *Photocopy) runBackfiller(ctx context.Context) error { 258 startTime := time.Now() 259 260 fmt.Println("querying clickhouse for dids and services...") 261 262 type alreadyFetchedItem struct { 263 Did string `ch:"did"` 264 } 265 var alreadyFetched []alreadyFetchedItem 266 if err := p.conn.Select(ctx, &alreadyFetched, "SELECT DISTINCT(did) FROM default.record WHERE created_at < '2025-07-01'"); err != nil { 267 return err 268 } 269 270 alreadyFetchedMap := map[string]bool{} 271 for _, d := range alreadyFetched { 272 alreadyFetchedMap[d.Did] = true 273 } 274 275 fmt.Println("getting dids") 276 277 var hostsCursor string 278 var sevs []ListServicesResponseItem 279 for { 280 if hostsCursor != "" { 281 hostsCursor = "&cursor=" + hostsCursor 282 } 283 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil) 284 if err != nil { 285 return err 286 } 287 288 resp, err := http.DefaultClient.Do(req) 289 if err != nil { 290 return err 291 } 292 defer resp.Body.Close() 293 294 if resp.StatusCode != http.StatusOK { 295 return fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 296 } 297 298 var sevsResp ListServicesResponse 299 if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil { 300 return fmt.Errorf("error decoding sevs response: %w", err) 301 } 302 303 for _, sev := range sevsResp.Hosts { 304 if sev.Status != "active" { 305 continue 306 } 307 308 sevs = append(sevs, sev) 309 } 310 311 if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" { 312 break 313 } 314 315 hostsCursor = sevsResp.Cursor 316 } 317 318 servicesDids := map[string][]string{} 319 for _, sev := range sevs { 320 servicesDids["https://"+sev.Hostname] = []string{} 321 } 322 323 fmt.Printf("found %d services\n", len(servicesDids)) 324 325 fmt.Printf("collecting dids...\n") 326 327 fmt.Printf("building download buckets...") 328 329 skipped := 0 330 downloader := NewRepoDownloader(p) 331 serviceDids := map[string][]string{} 332 333 for range runtime.NumCPU() / 2 { 334 go p.runProcessRepoWorker(ctx, downloader.processChan) 335 } 336 337 wg := sync.WaitGroup{} 338 mplk := sync.Mutex{} 339 for s := range servicesDids { 340 wg.Add(1) 341 go func() { 342 defer wg.Done() 343 repos, err := downloader.getDidsFromService(context.TODO(), s) 344 if err != nil { 345 fmt.Printf("error getting dids for services %s: %v", s, err) 346 return 347 } 348 dids := []string{} 349 for _, r := range repos { 350 if alreadyFetchedMap[r.Did] { 351 skipped++ 352 continue 353 } 354 dids = append(dids, r.Did) 355 } 356 mplk.Lock() 357 defer mplk.Unlock() 358 serviceDids[s] = dids 359 }() 360 } 361 362 fmt.Println("getting all the repos...") 363 wg.Wait() 364 365 fmt.Printf("was able to skip %d repos\n", skipped) 366 367 total := 0 368 369 for service, dids := range serviceDids { 370 if len(dids) < 100 { 371 continue 372 } 373 fmt.Printf("%s: %d jobs\n", service, len(dids)) 374 total += len(dids) 375 } 376 377 fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids)) 378 379 for _, c := range downloader.clients { 380 c.Timeout = 10 * time.Minute 381 } 382 383 for s := range downloader.rateLimits { 384 if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") { 385 downloader.rateLimits[s] = ratelimit.New(25) 386 } 387 } 388 389 processed := 0 390 errored := 0 391 var errors []error 392 for service, dids := range serviceDids { 393 go func() { 394 for _, did := range dids { 395 ratelimiter := downloader.getRateLimiter(service) 396 ratelimiter.Take() 397 398 b, err := downloader.downloadRepo(service, did) 399 if err != nil { 400 errored++ 401 processed++ 402 errors = append(errors, err) 403 continue 404 } 405 406 go func(b []byte, did string) { 407 downloader.processChan <- ProcessJob{repoBytes: b, did: did} 408 }(b, did) 409 410 processed++ 411 } 412 }() 413 } 414 415 ticker := time.NewTicker(1 * time.Second) 416 defer ticker.Stop() 417 418 for range ticker.C { 419 elapsed := time.Since(startTime) 420 rate := float64(processed) / elapsed.Seconds() 421 remaining := total - processed 422 423 var eta string 424 if rate > 0 { 425 etaSeconds := float64(remaining) / rate 426 etaDuration := time.Duration(etaSeconds * float64(time.Second)) 427 eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second)) 428 } else { 429 eta = ", ETA: calculating..." 430 } 431 432 for _, err := range errors { 433 fmt.Printf("%v\n", err) 434 } 435 436 errors = nil 437 438 fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s", 439 processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta) 440 } 441 442 fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored) 443 444 return nil 445}