···
16
-
"github.com/ClickHouse/clickhouse-go/v2"
17
-
atproto_repo "github.com/bluesky-social/indigo/atproto/repo"
18
-
"github.com/bluesky-social/indigo/atproto/syntax"
19
-
"github.com/bluesky-social/indigo/repo"
20
-
"github.com/bluesky-social/indigo/util"
21
-
"github.com/haileyok/photocopy/clickhouse_inserter"
22
-
"github.com/haileyok/photocopy/models"
23
-
"github.com/ipfs/go-cid"
24
-
"github.com/ipld/go-car"
25
-
_ "github.com/joho/godotenv/autoload"
26
-
"github.com/urfave/cli/v2"
27
-
"go.uber.org/ratelimit"
36
-
Name: "clickhouse-addr",
37
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_ADDR"},
41
-
Name: "clickhouse-database",
42
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_DATABASE"},
46
-
Name: "clickhouse-user",
47
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_USER"},
51
-
Name: "clickhouse-pass",
52
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_PASS"},
65
-
type RepoDownloader struct {
66
-
clients map[string]*http.Client
67
-
rateLimits map[string]ratelimit.Limiter
71
-
func NewRepoDownloader() *RepoDownloader {
72
-
return &RepoDownloader{
73
-
clients: make(map[string]*http.Client),
74
-
rateLimits: make(map[string]ratelimit.Limiter),
78
-
func (rd *RepoDownloader) getClient(service string) *http.Client {
80
-
client, exists := rd.clients[service]
88
-
defer rd.mu.Unlock()
90
-
if client, exists := rd.clients[service]; exists {
94
-
client = util.RobustHTTPClient()
95
-
client.Timeout = 30 * time.Minute
96
-
rd.clients[service] = client
100
-
func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter {
102
-
limiter, exists := rd.rateLimits[service]
110
-
defer rd.mu.Unlock()
112
-
if limiter, exists := rd.rateLimits[service]; exists {
116
-
// 3000 per five minutes
117
-
limiter = ratelimit.New(10)
118
-
rd.rateLimits[service] = limiter
122
-
func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) {
123
-
dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did)
125
-
req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil)
127
-
return nil, fmt.Errorf("failed to create request: %w", err)
130
-
client := rd.getClient(service)
132
-
resp, err := client.Do(req)
134
-
return nil, fmt.Errorf("failed to download repo: %w", err)
136
-
defer resp.Body.Close()
138
-
if resp.StatusCode != http.StatusOK {
139
-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
142
-
b, err := io.ReadAll(resp.Body)
144
-
return nil, fmt.Errorf("could not read bytes from response: %w", err)
150
-
func processRepo(b []byte, did string, inserter *clickhouse_inserter.Inserter) error {
151
-
bs := atproto_repo.NewTinyBlockstore()
152
-
cs, err := car.NewCarReader(bytes.NewReader(b))
154
-
return fmt.Errorf("error opening car: %v\n", err)
157
-
currBlock, _ := cs.Next()
158
-
for currBlock != nil {
159
-
bs.Put(context.TODO(), currBlock)
160
-
next, _ := cs.Next()
164
-
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
165
-
if err != nil || r == nil {
166
-
fmt.Printf("could not open repo: %v", err)
170
-
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
171
-
pts := strings.Split(key, "/")
174
-
cidStr := cid.String()
175
-
b, err := bs.Get(context.TODO(), cid)
181
-
tid, err := syntax.ParseTID(rkey)
188
-
rec := models.Record{
194
-
Raw: string(b.RawData()),
198
-
inserter.Insert(context.TODO(), rec)
202
-
return fmt.Errorf("erorr traversing records: %v", err)
208
-
type ListReposResponse struct {
209
-
Cursor string `json:"cursor"`
210
-
Repos []ListReposRepo `json:"repos"`
213
-
type ListReposRepo struct {
214
-
Did string `json:"did"`
215
-
Head string `json:"head"`
216
-
Rev string `json:"rev"`
217
-
Active bool `json:"active"`
218
-
Status *string `json:"status,omitempty"`
221
-
func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) {
223
-
var repos []ListReposRepo
225
-
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil)
230
-
rl := rd.getRateLimiter(service)
233
-
cli := rd.getClient(service)
234
-
resp, err := cli.Do(req)
238
-
defer resp.Body.Close()
240
-
if resp.StatusCode != http.StatusOK {
241
-
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
244
-
var reposResp ListReposResponse
245
-
if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil {
246
-
return nil, fmt.Errorf("error decoding repos response: %w", err)
249
-
repos = append(repos, reposResp.Repos...)
251
-
if len(reposResp.Repos) != 1000 {
259
-
var run = func(cmd *cli.Context) error {
260
-
startTime := time.Now()
262
-
conn, err := clickhouse.Open(&clickhouse.Options{
263
-
Addr: []string{cmd.String("clickhouse-addr")},
264
-
Auth: clickhouse.Auth{
265
-
Database: cmd.String("clickhouse-database"),
266
-
Username: cmd.String("clickhouse-user"),
267
-
Password: cmd.String("clickhouse-pass"),
275
-
fmt.Println("querying clickhouse for dids and services...")
277
-
type servicesQueryRow struct {
278
-
PlcOpServices []string `ch:"plc_op_services"`
280
-
var servicesQueryRows []servicesQueryRow
281
-
if err := conn.Select(cmd.Context, &servicesQueryRows, `
282
-
SELECT DISTINCT(plc_op_services) FROM default.plc WHERE arrayExists(x -> x LIKE '%.bsky.network', plc_op_services)
287
-
servicesDids := map[string][]string{}
288
-
for _, svcs := range servicesQueryRows {
289
-
for _, s := range svcs.PlcOpServices {
290
-
servicesDids[s] = []string{}
294
-
fmt.Printf("found %d services\n", len(servicesDids))
296
-
fmt.Printf("getting most recent record for each did...")
297
-
var records []models.Record
298
-
if err := conn.Select(cmd.Context, &records, `
299
-
SELECT did, created_at
300
-
FROM default.record
301
-
QUALIFY row_number() OVER (PARTITION BY did ORDER BY created_at ASC) = 1
306
-
fmt.Printf("collecting dids...\n")
308
-
didCreatedAt := map[string]time.Time{}
309
-
for _, r := range records {
310
-
didCreatedAt[r.Did] = r.CreatedAt
313
-
inserter, err := clickhouse_inserter.New(context.TODO(), &clickhouse_inserter.Args{
315
-
Logger: slog.Default(),
317
-
Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
318
-
RateLimit: 2, // two inserts per second in the event of massive repos
324
-
fmt.Printf("building download buckets...")
328
-
needOlderThan, _ := time.Parse(time.DateTime, "2025-06-28 04:18:22")
329
-
downloader := NewRepoDownloader()
330
-
serviceDids := map[string][]string{}
332
-
wg := sync.WaitGroup{}
333
-
for s := range servicesDids {
337
-
repos, err := downloader.getDidsFromService(context.TODO(), s)
339
-
fmt.Printf("error getting dids for services %s: %v", s, err)
343
-
for _, r := range repos {
344
-
lastRecord, exists := didCreatedAt[r.Did]
345
-
if exists && lastRecord.Before(needOlderThan) {
350
-
dids = append(dids, r.Did)
352
-
serviceDids[s] = dids
356
-
fmt.Println("getting all the repos...")
359
-
fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids))
360
-
fmt.Printf("was able to skip %d repos\n", skipped)
362
-
for service, dids := range serviceDids {
363
-
if len(dids) < 100 {
366
-
fmt.Printf("%s: %d jobs\n", service, len(dids))
372
-
for service, dids := range serviceDids {
374
-
for _, did := range dids {
375
-
ratelimiter := downloader.getRateLimiter(service)
378
-
b, err := downloader.downloadRepo(service, did)
385
-
go func(b []byte, did string, inserter *clickhouse_inserter.Inserter) {
386
-
processRepo(b, did, inserter)
387
-
}(b, did, inserter)
394
-
ticker := time.NewTicker(1 * time.Second)
395
-
defer ticker.Stop()
397
-
for range ticker.C {
398
-
elapsed := time.Since(startTime)
399
-
rate := float64(processed) / elapsed.Seconds()
400
-
remaining := total - processed
404
-
etaSeconds := float64(remaining) / rate
405
-
etaDuration := time.Duration(etaSeconds * float64(time.Second))
406
-
eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second))
408
-
eta = ", ETA: calculating..."
411
-
fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s",
412
-
processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta)
415
-
fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored)
417
-
inserter.Close(context.TODO())