this repo has no description

Compare changes

Choose any two refs to compare.

+47 -10
backfiller.go
···
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
···
"go.uber.org/ratelimit"
)
type RepoDownloader struct {
-
clients map[string]*http.Client
-
rateLimits map[string]ratelimit.Limiter
-
mu sync.RWMutex
-
p *Photocopy
}
func NewRepoDownloader(p *Photocopy) *RepoDownloader {
return &RepoDownloader{
-
clients: make(map[string]*http.Client),
-
rateLimits: make(map[string]ratelimit.Limiter),
-
p: p,
}
}
···
bs := atproto_repo.NewTinyBlockstore()
cs, err := car.NewCarReader(bytes.NewReader(b))
if err != nil {
return fmt.Errorf("error opening car: %v\n", err)
}
···
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
if err != nil || r == nil {
fmt.Printf("could not open repo: %v", err)
return nil
}
···
Status string `json:"status"`
}
func (p *Photocopy) runBackfiller(ctx context.Context) error {
startTime := time.Now()
fmt.Println("querying clickhouse for dids and services...")
var hostsCursor string
var sevs []ListServicesResponseItem
for {
···
downloader := NewRepoDownloader(p)
serviceDids := map[string][]string{}
wg := sync.WaitGroup{}
mplk := sync.Mutex{}
for s := range servicesDids {
···
}
dids := []string{}
for _, r := range repos {
dids = append(dids, r.Did)
}
mplk.Lock()
···
}
go func(b []byte, did string) {
-
if err := p.processRepo(ctx, b, did); err != nil {
-
fmt.Printf("error processing backfill record: %v\n", err)
-
}
}(b, did)
processed++
···
"fmt"
"io"
"net/http"
+
"runtime"
"strings"
"sync"
"time"
···
"go.uber.org/ratelimit"
)
+
type ProcessJob struct {
+
did string
+
repoBytes []byte
+
}
+
type RepoDownloader struct {
+
clients map[string]*http.Client
+
rateLimits map[string]ratelimit.Limiter
+
processChan chan ProcessJob
+
mu sync.RWMutex
+
p *Photocopy
}
func NewRepoDownloader(p *Photocopy) *RepoDownloader {
return &RepoDownloader{
+
clients: make(map[string]*http.Client),
+
rateLimits: make(map[string]ratelimit.Limiter),
+
p: p,
+
processChan: make(chan ProcessJob, 1000),
}
}
···
bs := atproto_repo.NewTinyBlockstore()
cs, err := car.NewCarReader(bytes.NewReader(b))
if err != nil {
+
fmt.Println("error opening car", err)
return fmt.Errorf("error opening car: %v\n", err)
}
···
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
if err != nil || r == nil {
+
fmt.Println("error opening repo", err)
fmt.Printf("could not open repo: %v", err)
return nil
}
···
Status string `json:"status"`
}
+
func (p *Photocopy) runProcessRepoWorker(ctx context.Context, jobs <-chan ProcessJob) {
+
for j := range jobs {
+
p.processRepo(ctx, j.repoBytes, j.did)
+
}
+
}
+
func (p *Photocopy) runBackfiller(ctx context.Context) error {
startTime := time.Now()
fmt.Println("querying clickhouse for dids and services...")
+
type alreadyFetchedItem struct {
+
Did string `ch:"did"`
+
}
+
var alreadyFetched []alreadyFetchedItem
+
if err := p.conn.Select(ctx, &alreadyFetched, "SELECT DISTINCT(did) FROM default.record WHERE created_at < '2025-07-01'"); err != nil {
+
return err
+
}
+
+
alreadyFetchedMap := map[string]bool{}
+
for _, d := range alreadyFetched {
+
alreadyFetchedMap[d.Did] = true
+
}
+
+
fmt.Println("getting dids")
+
var hostsCursor string
var sevs []ListServicesResponseItem
for {
···
downloader := NewRepoDownloader(p)
serviceDids := map[string][]string{}
+
for range runtime.NumCPU() / 2 {
+
go p.runProcessRepoWorker(ctx, downloader.processChan)
+
}
+
wg := sync.WaitGroup{}
mplk := sync.Mutex{}
for s := range servicesDids {
···
}
dids := []string{}
for _, r := range repos {
+
if alreadyFetchedMap[r.Did] {
+
skipped++
+
continue
+
}
dids = append(dids, r.Did)
}
mplk.Lock()
···
}
go func(b []byte, did string) {
+
downloader.processChan <- ProcessJob{repoBytes: b, did: did}
}(b, did)
processed++
-420
cmd/bodega/main.go
···
-
package main
-
-
import (
-
"bytes"
-
"context"
-
"encoding/json"
-
"fmt"
-
"io"
-
"log/slog"
-
"net/http"
-
"os"
-
"strings"
-
"sync"
-
"time"
-
-
"github.com/ClickHouse/clickhouse-go/v2"
-
atproto_repo "github.com/bluesky-social/indigo/atproto/repo"
-
"github.com/bluesky-social/indigo/atproto/syntax"
-
"github.com/bluesky-social/indigo/repo"
-
"github.com/bluesky-social/indigo/util"
-
"github.com/haileyok/photocopy/clickhouse_inserter"
-
"github.com/haileyok/photocopy/models"
-
"github.com/ipfs/go-cid"
-
"github.com/ipld/go-car"
-
_ "github.com/joho/godotenv/autoload"
-
"github.com/urfave/cli/v2"
-
"go.uber.org/ratelimit"
-
)
-
-
func main() {
-
app := cli.App{
-
Name: "bodega",
-
Action: run,
-
Flags: []cli.Flag{
-
&cli.StringFlag{
-
Name: "clickhouse-addr",
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_ADDR"},
-
Required: true,
-
},
-
&cli.StringFlag{
-
Name: "clickhouse-database",
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_DATABASE"},
-
Required: true,
-
},
-
&cli.StringFlag{
-
Name: "clickhouse-user",
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_USER"},
-
Value: "default",
-
},
-
&cli.StringFlag{
-
Name: "clickhouse-pass",
-
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_PASS"},
-
Required: true,
-
},
-
&cli.BoolFlag{
-
Name: "debug",
-
Value: false,
-
},
-
},
-
}
-
-
app.Run(os.Args)
-
}
-
-
type RepoDownloader struct {
-
clients map[string]*http.Client
-
rateLimits map[string]ratelimit.Limiter
-
mu sync.RWMutex
-
}
-
-
func NewRepoDownloader() *RepoDownloader {
-
return &RepoDownloader{
-
clients: make(map[string]*http.Client),
-
rateLimits: make(map[string]ratelimit.Limiter),
-
}
-
}
-
-
func (rd *RepoDownloader) getClient(service string) *http.Client {
-
rd.mu.RLock()
-
client, exists := rd.clients[service]
-
rd.mu.RUnlock()
-
-
if exists {
-
return client
-
}
-
-
rd.mu.Lock()
-
defer rd.mu.Unlock()
-
-
if client, exists := rd.clients[service]; exists {
-
return client
-
}
-
-
client = util.RobustHTTPClient()
-
client.Timeout = 30 * time.Minute
-
rd.clients[service] = client
-
return client
-
}
-
-
func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter {
-
rd.mu.RLock()
-
limiter, exists := rd.rateLimits[service]
-
rd.mu.RUnlock()
-
-
if exists {
-
return limiter
-
}
-
-
rd.mu.Lock()
-
defer rd.mu.Unlock()
-
-
if limiter, exists := rd.rateLimits[service]; exists {
-
return limiter
-
}
-
-
// 3000 per five minutes
-
limiter = ratelimit.New(10)
-
rd.rateLimits[service] = limiter
-
return limiter
-
}
-
-
func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) {
-
dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did)
-
-
req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil)
-
if err != nil {
-
return nil, fmt.Errorf("failed to create request: %w", err)
-
}
-
-
client := rd.getClient(service)
-
-
resp, err := client.Do(req)
-
if err != nil {
-
return nil, fmt.Errorf("failed to download repo: %w", err)
-
}
-
defer resp.Body.Close()
-
-
if resp.StatusCode != http.StatusOK {
-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
-
}
-
-
b, err := io.ReadAll(resp.Body)
-
if err != nil {
-
return nil, fmt.Errorf("could not read bytes from response: %w", err)
-
}
-
-
return b, nil
-
}
-
-
func processRepo(b []byte, did string, inserter *clickhouse_inserter.Inserter) error {
-
bs := atproto_repo.NewTinyBlockstore()
-
cs, err := car.NewCarReader(bytes.NewReader(b))
-
if err != nil {
-
return fmt.Errorf("error opening car: %v\n", err)
-
}
-
-
currBlock, _ := cs.Next()
-
for currBlock != nil {
-
bs.Put(context.TODO(), currBlock)
-
next, _ := cs.Next()
-
currBlock = next
-
}
-
-
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
-
if err != nil || r == nil {
-
fmt.Printf("could not open repo: %v", err)
-
return nil
-
}
-
-
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
-
pts := strings.Split(key, "/")
-
nsid := pts[0]
-
rkey := pts[1]
-
cidStr := cid.String()
-
b, err := bs.Get(context.TODO(), cid)
-
if err != nil {
-
return nil
-
}
-
-
var cat time.Time
-
tid, err := syntax.ParseTID(rkey)
-
if err != nil {
-
cat = time.Now()
-
} else {
-
cat = tid.Time()
-
}
-
-
rec := models.Record{
-
Did: did,
-
Rkey: rkey,
-
Collection: nsid,
-
Cid: cidStr,
-
Seq: "",
-
Raw: string(b.RawData()),
-
CreatedAt: cat,
-
}
-
-
inserter.Insert(context.TODO(), rec)
-
-
return nil
-
}); err != nil {
-
return fmt.Errorf("erorr traversing records: %v", err)
-
}
-
-
return nil
-
}
-
-
type ListReposResponse struct {
-
Cursor string `json:"cursor"`
-
Repos []ListReposRepo `json:"repos"`
-
}
-
-
type ListReposRepo struct {
-
Did string `json:"did"`
-
Head string `json:"head"`
-
Rev string `json:"rev"`
-
Active bool `json:"active"`
-
Status *string `json:"status,omitempty"`
-
}
-
-
func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) {
-
var cursor string
-
var repos []ListReposRepo
-
for {
-
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil)
-
if err != nil {
-
return nil, err
-
}
-
-
rl := rd.getRateLimiter(service)
-
rl.Take()
-
-
cli := rd.getClient(service)
-
resp, err := cli.Do(req)
-
if err != nil {
-
return nil, err
-
}
-
defer resp.Body.Close()
-
-
if resp.StatusCode != http.StatusOK {
-
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
-
}
-
-
var reposResp ListReposResponse
-
if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil {
-
return nil, fmt.Errorf("error decoding repos response: %w", err)
-
}
-
-
repos = append(repos, reposResp.Repos...)
-
-
if len(reposResp.Repos) != 1000 {
-
break
-
}
-
}
-
-
return repos, nil
-
}
-
-
var run = func(cmd *cli.Context) error {
-
startTime := time.Now()
-
-
conn, err := clickhouse.Open(&clickhouse.Options{
-
Addr: []string{cmd.String("clickhouse-addr")},
-
Auth: clickhouse.Auth{
-
Database: cmd.String("clickhouse-database"),
-
Username: cmd.String("clickhouse-user"),
-
Password: cmd.String("clickhouse-pass"),
-
},
-
})
-
if err != nil {
-
return err
-
}
-
defer conn.Close()
-
-
fmt.Println("querying clickhouse for dids and services...")
-
-
type servicesQueryRow struct {
-
PlcOpServices []string `ch:"plc_op_services"`
-
}
-
var servicesQueryRows []servicesQueryRow
-
if err := conn.Select(cmd.Context, &servicesQueryRows, `
-
SELECT DISTINCT(plc_op_services) FROM default.plc WHERE arrayExists(x -> x LIKE '%.bsky.network', plc_op_services)
-
`); err != nil {
-
return err
-
}
-
-
servicesDids := map[string][]string{}
-
for _, svcs := range servicesQueryRows {
-
for _, s := range svcs.PlcOpServices {
-
servicesDids[s] = []string{}
-
}
-
}
-
-
fmt.Printf("found %d services\n", len(servicesDids))
-
-
fmt.Printf("getting most recent record for each did...")
-
var records []models.Record
-
if err := conn.Select(cmd.Context, &records, `
-
SELECT did, created_at
-
FROM default.record
-
QUALIFY row_number() OVER (PARTITION BY did ORDER BY created_at ASC) = 1
-
`); err != nil {
-
return err
-
}
-
-
fmt.Printf("collecting dids...\n")
-
-
didCreatedAt := map[string]time.Time{}
-
for _, r := range records {
-
didCreatedAt[r.Did] = r.CreatedAt
-
}
-
-
inserter, err := clickhouse_inserter.New(context.TODO(), &clickhouse_inserter.Args{
-
BatchSize: 100000,
-
Logger: slog.Default(),
-
Conn: conn,
-
Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
-
RateLimit: 2, // two inserts per second in the event of massive repos
-
})
-
if err != nil {
-
return err
-
}
-
-
fmt.Printf("building download buckets...")
-
-
skipped := 0
-
total := 0
-
needOlderThan, _ := time.Parse(time.DateTime, "2025-06-28 04:18:22")
-
downloader := NewRepoDownloader()
-
serviceDids := map[string][]string{}
-
-
wg := sync.WaitGroup{}
-
for s := range servicesDids {
-
wg.Add(1)
-
go func() {
-
defer wg.Done()
-
repos, err := downloader.getDidsFromService(context.TODO(), s)
-
if err != nil {
-
fmt.Printf("error getting dids for services %s: %v", s, err)
-
return
-
}
-
dids := []string{}
-
for _, r := range repos {
-
lastRecord, exists := didCreatedAt[r.Did]
-
if exists && lastRecord.Before(needOlderThan) {
-
skipped++
-
continue
-
}
-
-
dids = append(dids, r.Did)
-
}
-
serviceDids[s] = dids
-
}()
-
}
-
-
fmt.Println("getting all the repos...")
-
wg.Wait()
-
-
fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids))
-
fmt.Printf("was able to skip %d repos\n", skipped)
-
-
for service, dids := range serviceDids {
-
if len(dids) < 100 {
-
continue
-
}
-
fmt.Printf("%s: %d jobs\n", service, len(dids))
-
}
-
-
processed := 0
-
errored := 0
-
-
for service, dids := range serviceDids {
-
go func() {
-
for _, did := range dids {
-
ratelimiter := downloader.getRateLimiter(service)
-
ratelimiter.Take()
-
-
b, err := downloader.downloadRepo(service, did)
-
if err != nil {
-
errored++
-
processed++
-
continue
-
}
-
-
go func(b []byte, did string, inserter *clickhouse_inserter.Inserter) {
-
processRepo(b, did, inserter)
-
}(b, did, inserter)
-
-
processed++
-
}
-
}()
-
}
-
-
ticker := time.NewTicker(1 * time.Second)
-
defer ticker.Stop()
-
-
for range ticker.C {
-
elapsed := time.Since(startTime)
-
rate := float64(processed) / elapsed.Seconds()
-
remaining := total - processed
-
-
var eta string
-
if rate > 0 {
-
etaSeconds := float64(remaining) / rate
-
etaDuration := time.Duration(etaSeconds * float64(time.Second))
-
eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second))
-
} else {
-
eta = ", ETA: calculating..."
-
}
-
-
fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s",
-
processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta)
-
}
-
-
fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored)
-
-
inserter.Close(context.TODO())
-
-
return nil
-
}
···
-28
cmd/bodega/plc_models.go
···
-
package main
-
-
import (
-
"time"
-
)
-
-
type ClickhousePLCEntry struct {
-
Did string `ch:"did"`
-
Cid string `ch:"cid"`
-
Nullified bool `ch:"nullified"`
-
CreatedAt time.Time `ch:"created_at"`
-
PlcOpSig string `ch:"plc_op_sig"`
-
PlcOpPrev string `ch:"plc_op_prev"`
-
PlcOpType string `ch:"plc_op_type"`
-
PlcOpServices []string `ch:"plc_op_services"`
-
PlcOpAlsoKnownAs []string `ch:"plc_op_also_known_as"`
-
PlcOpRotationKeys []string `ch:"plc_op_rotation_keys"`
-
PlcTombSig string `ch:"plc_tomb_sig"`
-
PlcTombPrev string `ch:"plc_tomb_prev"`
-
PlcTombType string `ch:"plc_tomb_type"`
-
LegacyOpSig string `ch:"legacy_op_sig"`
-
LegacyOpPrev string `ch:"legacy_op_prev"`
-
LegacyOpType string `ch:"legacy_op_type"`
-
LegacyOpHandle string `ch:"legacy_op_handle"`
-
LegacyOpService string `ch:"legacy_op_service"`
-
LegacyOpSigningKey string `ch:"legacy_op_signing_key"`
-
LegacyOpRecoveryKey string `ch:"legacy_op_recovery_key"`
-
}
···
+10
cmd/photocopy/main.go
···
&cli.BoolFlag{
Name: "with-backfill",
},
},
Commands: cli.Commands{
&cli.Command{
···
ClickhouseUser: cmd.String("clickhouse-user"),
ClickhousePass: cmd.String("clickhouse-pass"),
RatelimitBypassKey: cmd.String("ratelimit-bypass-key"),
})
if err != nil {
panic(err)
···
&cli.BoolFlag{
Name: "with-backfill",
},
+
&cli.StringFlag{
+
Name: "nervana-endpoint",
+
EnvVars: []string{"PHOTOCOPY_NERVANA_ENDPOINT"},
+
},
+
&cli.StringFlag{
+
Name: "nervana-api-key",
+
EnvVars: []string{"PHOTOCOPY_NERVANA_API_KEY"},
+
},
},
Commands: cli.Commands{
&cli.Command{
···
ClickhouseUser: cmd.String("clickhouse-user"),
ClickhousePass: cmd.String("clickhouse-pass"),
RatelimitBypassKey: cmd.String("ratelimit-bypass-key"),
+
NervanaEndpoint: cmd.String("nervana-endpoint"),
+
NervanaApiKey: cmd.String("nervana-api-key"),
})
if err != nil {
panic(err)
+1 -1
go.mod
···
github.com/prometheus/client_golang v1.22.0
github.com/urfave/cli/v2 v2.25.7
go.uber.org/ratelimit v0.3.1
-
golang.org/x/sync v0.15.0
)
require (
···
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/time v0.11.0 // indirect
···
github.com/prometheus/client_golang v1.22.0
github.com/urfave/cli/v2 v2.25.7
go.uber.org/ratelimit v0.3.1
)
require (
···
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.41.0 // indirect
+
golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/time v0.11.0 // indirect
+30
handle_create.go
···
"bytes"
"context"
"fmt"
"strings"
"time"
···
IndexedAt: indexedAt,
Did: did,
Lang: lang,
}
if rec.Reply != nil {
···
if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
return err
}
return nil
···
"bytes"
"context"
"fmt"
+
"slices"
"strings"
"time"
···
IndexedAt: indexedAt,
Did: did,
Lang: lang,
+
Text: rec.Text,
}
if rec.Reply != nil {
···
if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
return err
+
}
+
+
isEn := slices.Contains(rec.Langs, "en")
+
if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil {
+
go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+
defer cancel()
+
+
nervanaItems, err := p.nervanaClient.MakeRequest(ctx, rec.Text)
+
if err != nil {
+
p.logger.Error("error making nervana items request", "error", err)
+
return
+
}
+
+
for _, ni := range nervanaItems {
+
postLabel := models.PostLabel{
+
Did: did,
+
Rkey: rkey,
+
Text: ni.Text,
+
Label: ni.Label,
+
EntityId: ni.EntityId,
+
Description: ni.Description,
+
Topic: "",
+
CreatedAt: time.Now(),
+
}
+
p.inserters.labelsInserter.Insert(ctx, postLabel)
+
}
+
}(ctx, rec, did, rkey)
}
return nil
+1
models/post.go
···
QuoteUri string `ch:"quote_uri"`
QuoteDid string `ch:"quote_did"`
Lang string `ch:"lang"`
}
···
QuoteUri string `ch:"quote_uri"`
QuoteDid string `ch:"quote_did"`
Lang string `ch:"lang"`
+
Text string `ch:"text"`
}
+14
models/post_label.go
···
···
+
package models
+
+
import "time"
+
+
type PostLabel struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
Text string `ch:"text"`
+
Label string `ch:"label"`
+
EntityId string `ch:"entity_id"`
+
Description string `ch:"description"`
+
Topic string `ch:"topic"`
+
}
+77
nervana/client.go
···
···
+
package nervana
+
+
import (
+
"bytes"
+
"context"
+
"encoding/json"
+
"fmt"
+
"io"
+
"net/http"
+
"time"
+
)
+
+
type Client struct {
+
cli *http.Client
+
endpoint string
+
apiKey string
+
}
+
+
func NewClient(endpoint string, apiKey string) *Client {
+
return &Client{
+
cli: &http.Client{
+
Timeout: 5 * time.Second,
+
},
+
endpoint: endpoint,
+
apiKey: apiKey,
+
}
+
}
+
+
type NervanaItem struct {
+
Text string `json:"text"`
+
Label string `json:"label"`
+
EntityId string `json:"entityId"`
+
Description string `json:"description"`
+
}
+
+
func (c *Client) newRequest(ctx context.Context, text string) (*http.Request, error) {
+
payload := map[string]string{
+
"text": text,
+
"language": "en",
+
}
+
+
b, err := json.Marshal(payload)
+
if err != nil {
+
return nil, err
+
}
+
+
req, err := http.NewRequestWithContext(ctx, "GET", c.endpoint, bytes.NewReader(b))
+
+
req.Header.Set("Authorization", "Bearer "+c.apiKey)
+
+
return req, err
+
}
+
+
func (c *Client) MakeRequest(ctx context.Context, text string) ([]NervanaItem, error) {
+
req, err := c.newRequest(ctx, text)
+
if err != nil {
+
return nil, err
+
}
+
+
resp, err := c.cli.Do(req)
+
if err != nil {
+
return nil, err
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != 200 {
+
io.Copy(io.Discard, resp.Body)
+
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
+
}
+
+
var nervanaResp []NervanaItem
+
if err := json.NewDecoder(resp.Body).Decode(&nervanaResp); err != nil {
+
return nil, err
+
}
+
+
return nervanaResp, nil
+
}
+31 -5
photocopy.go
···
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/haileyok/photocopy/clickhouse_inserter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
···
ratelimitBypassKey string
conn driver.Conn
}
type Inserters struct {
···
plcInserter *clickhouse_inserter.Inserter
recordsInserter *clickhouse_inserter.Inserter
deletesInserter *clickhouse_inserter.Inserter
}
type Args struct {
···
ClickhouseUser string
ClickhousePass string
RatelimitBypassKey string
}
func New(ctx context.Context, args *Args) (*Photocopy, error) {
···
fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_follows",
Histogram: insertionsHist,
-
BatchSize: 250_000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
···
pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_posts",
Histogram: insertionsHist,
-
BatchSize: 250_000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
···
ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_interactions",
Histogram: insertionsHist,
-
BatchSize: 250_000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
···
ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_records",
Histogram: insertionsHist,
-
BatchSize: 250_000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
···
di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_deletes",
Histogram: insertionsHist,
-
BatchSize: 250_000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO delete (did, rkey, created_at)",
···
return nil, err
}
is := &Inserters{
followsInserter: fi,
postsInserter: pi,
interactionsInserter: ii,
recordsInserter: ri,
deletesInserter: di,
}
p.inserters = is
···
p.inserters.plcInserter = plci
p.plcScraper = plcs
return p, nil
}
···
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/haileyok/photocopy/clickhouse_inserter"
+
"github.com/haileyok/photocopy/nervana"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
···
ratelimitBypassKey string
conn driver.Conn
+
+
nervanaClient *nervana.Client
+
nervanaEndpoint string
+
nervanaApiKey string
}
type Inserters struct {
···
plcInserter *clickhouse_inserter.Inserter
recordsInserter *clickhouse_inserter.Inserter
deletesInserter *clickhouse_inserter.Inserter
+
labelsInserter *clickhouse_inserter.Inserter
}
type Args struct {
···
ClickhouseUser string
ClickhousePass string
RatelimitBypassKey string
+
NervanaEndpoint string
+
NervanaApiKey string
}
func New(ctx context.Context, args *Args) (*Photocopy, error) {
···
fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_follows",
Histogram: insertionsHist,
+
BatchSize: 500,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
···
pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_posts",
Histogram: insertionsHist,
+
BatchSize: 300,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
···
ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_interactions",
Histogram: insertionsHist,
+
BatchSize: 1000,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
···
ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_records",
Histogram: insertionsHist,
+
BatchSize: 2500,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
···
di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_deletes",
Histogram: insertionsHist,
+
BatchSize: 500,
Logger: p.logger,
Conn: conn,
Query: "INSERT INTO delete (did, rkey, created_at)",
···
return nil, err
}
+
li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_labels",
+
Histogram: insertionsHist,
+
BatchSize: 100,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
is := &Inserters{
followsInserter: fi,
postsInserter: pi,
interactionsInserter: ii,
recordsInserter: ri,
deletesInserter: di,
+
labelsInserter: li,
}
p.inserters = is
···
p.inserters.plcInserter = plci
p.plcScraper = plcs
+
+
if args.NervanaApiKey != "" && args.NervanaEndpoint != "" {
+
p.nervanaClient = nervana.NewClient(args.NervanaEndpoint, args.NervanaApiKey)
+
}
return p, nil
}