this repo has no description

Compare changes

Choose any two refs to compare.

+1
Makefile
···
.PHONY: build
build: ## Build all executables
go build -ldflags "-X main.Version=$(VERSION)" -o photocopy ./cmd/photocopy
+
go build -o bodega ./cmd/bodega
.PHONY: run
run:
+445
backfiller.go
···
+
package photocopy
+
+
import (
+
"bytes"
+
"context"
+
"encoding/json"
+
"fmt"
+
"io"
+
"net/http"
+
"runtime"
+
"strings"
+
"sync"
+
"time"
+
+
atproto_repo "github.com/bluesky-social/indigo/atproto/repo"
+
"github.com/bluesky-social/indigo/repo"
+
"github.com/bluesky-social/indigo/util"
+
"github.com/ipfs/go-cid"
+
"github.com/ipld/go-car"
+
_ "github.com/joho/godotenv/autoload"
+
"go.uber.org/ratelimit"
+
)
+
+
type ProcessJob struct {
+
did string
+
repoBytes []byte
+
}
+
+
type RepoDownloader struct {
+
clients map[string]*http.Client
+
rateLimits map[string]ratelimit.Limiter
+
processChan chan ProcessJob
+
mu sync.RWMutex
+
p *Photocopy
+
}
+
+
func NewRepoDownloader(p *Photocopy) *RepoDownloader {
+
return &RepoDownloader{
+
clients: make(map[string]*http.Client),
+
rateLimits: make(map[string]ratelimit.Limiter),
+
p: p,
+
processChan: make(chan ProcessJob, 1000),
+
}
+
}
+
+
func (rd *RepoDownloader) getClient(service string) *http.Client {
+
rd.mu.RLock()
+
client, exists := rd.clients[service]
+
rd.mu.RUnlock()
+
+
if exists {
+
return client
+
}
+
+
rd.mu.Lock()
+
defer rd.mu.Unlock()
+
+
if client, exists := rd.clients[service]; exists {
+
return client
+
}
+
+
client = util.RobustHTTPClient()
+
client.Timeout = 45 * time.Second
+
rd.clients[service] = client
+
return client
+
}
+
+
func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter {
+
if !strings.HasSuffix(service, ".bsky.network") {
+
service = "third-party"
+
}
+
+
rd.mu.RLock()
+
limiter, exists := rd.rateLimits[service]
+
rd.mu.RUnlock()
+
+
if exists {
+
return limiter
+
}
+
+
rd.mu.Lock()
+
defer rd.mu.Unlock()
+
+
if limiter, exists := rd.rateLimits[service]; exists {
+
return limiter
+
}
+
+
// 3000 per five minutes
+
limiter = ratelimit.New(10)
+
rd.rateLimits[service] = limiter
+
return limiter
+
}
+
+
func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) {
+
dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did)
+
+
req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil)
+
if err != nil {
+
return nil, fmt.Errorf("failed to create request: %w", err)
+
}
+
+
if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
+
req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
+
}
+
+
client := rd.getClient(service)
+
+
resp, err := client.Do(req)
+
if err != nil {
+
return nil, fmt.Errorf("failed to download repo: %w", err)
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
if resp.StatusCode == 400 {
+
return nil, nil
+
}
+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
+
}
+
+
b, err := io.ReadAll(resp.Body)
+
if err != nil {
+
return nil, fmt.Errorf("could not read bytes from response: %w", err)
+
}
+
+
return b, nil
+
}
+
+
func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error {
+
bs := atproto_repo.NewTinyBlockstore()
+
cs, err := car.NewCarReader(bytes.NewReader(b))
+
if err != nil {
+
fmt.Println("error opening car", err)
+
return fmt.Errorf("error opening car: %v\n", err)
+
}
+
+
currBlock, _ := cs.Next()
+
for currBlock != nil {
+
bs.Put(context.TODO(), currBlock)
+
next, _ := cs.Next()
+
currBlock = next
+
}
+
+
r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0])
+
if err != nil || r == nil {
+
fmt.Println("error opening repo", err)
+
fmt.Printf("could not open repo: %v", err)
+
return nil
+
}
+
+
if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error {
+
pts := strings.Split(key, "/")
+
nsid := pts[0]
+
rkey := pts[1]
+
cidStr := cid.String()
+
b, err := bs.Get(context.TODO(), cid)
+
if err != nil {
+
return nil
+
}
+
if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil {
+
return err
+
}
+
return nil
+
}); err != nil {
+
return fmt.Errorf("erorr traversing records: %v", err)
+
}
+
+
return nil
+
}
+
+
type ListReposResponse struct {
+
Cursor string `json:"cursor"`
+
Repos []ListReposRepo `json:"repos"`
+
}
+
+
type ListReposRepo struct {
+
Did string `json:"did"`
+
Head string `json:"head"`
+
Rev string `json:"rev"`
+
Active bool `json:"active"`
+
Status *string `json:"status,omitempty"`
+
}
+
+
func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) {
+
var cursor string
+
var repos []ListReposRepo
+
if service == "https://atproto.brid.gy" {
+
return nil, nil
+
}
+
for {
+
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil)
+
if err != nil {
+
return nil, err
+
}
+
+
if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") {
+
req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey)
+
}
+
+
rl := rd.getRateLimiter(service)
+
rl.Take()
+
+
cli := rd.getClient(service)
+
resp, err := cli.Do(req)
+
if err != nil {
+
return nil, err
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
+
}
+
+
var reposResp ListReposResponse
+
if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil {
+
return nil, fmt.Errorf("error decoding repos response: %w", err)
+
}
+
+
for _, repo := range reposResp.Repos {
+
if repo.Status != nil {
+
if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" {
+
continue
+
}
+
}
+
+
repos = append(repos, repo)
+
}
+
+
if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" {
+
break
+
}
+
+
fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service)
+
+
cursor = reposResp.Cursor
+
}
+
+
return repos, nil
+
}
+
+
type ListServicesResponse struct {
+
Cursor string `json:"cursor"`
+
Hosts []ListServicesResponseItem `json:"hosts"`
+
}
+
+
type ListServicesResponseItem struct {
+
Hostname string `json:"hostname"`
+
Status string `json:"status"`
+
}
+
+
func (p *Photocopy) runProcessRepoWorker(ctx context.Context, jobs <-chan ProcessJob) {
+
for j := range jobs {
+
p.processRepo(ctx, j.repoBytes, j.did)
+
}
+
}
+
+
func (p *Photocopy) runBackfiller(ctx context.Context) error {
+
startTime := time.Now()
+
+
fmt.Println("querying clickhouse for dids and services...")
+
+
type alreadyFetchedItem struct {
+
Did string `ch:"did"`
+
}
+
var alreadyFetched []alreadyFetchedItem
+
if err := p.conn.Select(ctx, &alreadyFetched, "SELECT DISTINCT(did) FROM default.record WHERE created_at < '2025-07-01'"); err != nil {
+
return err
+
}
+
+
alreadyFetchedMap := map[string]bool{}
+
for _, d := range alreadyFetched {
+
alreadyFetchedMap[d.Did] = true
+
}
+
+
fmt.Println("getting dids")
+
+
var hostsCursor string
+
var sevs []ListServicesResponseItem
+
for {
+
if hostsCursor != "" {
+
hostsCursor = "&cursor=" + hostsCursor
+
}
+
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil)
+
if err != nil {
+
return err
+
}
+
+
resp, err := http.DefaultClient.Do(req)
+
if err != nil {
+
return err
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != http.StatusOK {
+
return fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
+
}
+
+
var sevsResp ListServicesResponse
+
if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil {
+
return fmt.Errorf("error decoding sevs response: %w", err)
+
}
+
+
for _, sev := range sevsResp.Hosts {
+
if sev.Status != "active" {
+
continue
+
}
+
+
sevs = append(sevs, sev)
+
}
+
+
if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" {
+
break
+
}
+
+
hostsCursor = sevsResp.Cursor
+
}
+
+
servicesDids := map[string][]string{}
+
for _, sev := range sevs {
+
servicesDids["https://"+sev.Hostname] = []string{}
+
}
+
+
fmt.Printf("found %d services\n", len(servicesDids))
+
+
fmt.Printf("collecting dids...\n")
+
+
fmt.Printf("building download buckets...")
+
+
skipped := 0
+
downloader := NewRepoDownloader(p)
+
serviceDids := map[string][]string{}
+
+
for range runtime.NumCPU() / 2 {
+
go p.runProcessRepoWorker(ctx, downloader.processChan)
+
}
+
+
wg := sync.WaitGroup{}
+
mplk := sync.Mutex{}
+
for s := range servicesDids {
+
wg.Add(1)
+
go func() {
+
defer wg.Done()
+
repos, err := downloader.getDidsFromService(context.TODO(), s)
+
if err != nil {
+
fmt.Printf("error getting dids for services %s: %v", s, err)
+
return
+
}
+
dids := []string{}
+
for _, r := range repos {
+
if alreadyFetchedMap[r.Did] {
+
skipped++
+
continue
+
}
+
dids = append(dids, r.Did)
+
}
+
mplk.Lock()
+
defer mplk.Unlock()
+
serviceDids[s] = dids
+
}()
+
}
+
+
fmt.Println("getting all the repos...")
+
wg.Wait()
+
+
fmt.Printf("was able to skip %d repos\n", skipped)
+
+
total := 0
+
+
for service, dids := range serviceDids {
+
if len(dids) < 100 {
+
continue
+
}
+
fmt.Printf("%s: %d jobs\n", service, len(dids))
+
total += len(dids)
+
}
+
+
fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids))
+
+
for _, c := range downloader.clients {
+
c.Timeout = 10 * time.Minute
+
}
+
+
for s := range downloader.rateLimits {
+
if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") {
+
downloader.rateLimits[s] = ratelimit.New(25)
+
}
+
}
+
+
processed := 0
+
errored := 0
+
var errors []error
+
for service, dids := range serviceDids {
+
go func() {
+
for _, did := range dids {
+
ratelimiter := downloader.getRateLimiter(service)
+
ratelimiter.Take()
+
+
b, err := downloader.downloadRepo(service, did)
+
if err != nil {
+
errored++
+
processed++
+
errors = append(errors, err)
+
continue
+
}
+
+
go func(b []byte, did string) {
+
downloader.processChan <- ProcessJob{repoBytes: b, did: did}
+
}(b, did)
+
+
processed++
+
}
+
}()
+
}
+
+
ticker := time.NewTicker(1 * time.Second)
+
defer ticker.Stop()
+
+
for range ticker.C {
+
elapsed := time.Since(startTime)
+
rate := float64(processed) / elapsed.Seconds()
+
remaining := total - processed
+
+
var eta string
+
if rate > 0 {
+
etaSeconds := float64(remaining) / rate
+
etaDuration := time.Duration(etaSeconds * float64(time.Second))
+
eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second))
+
} else {
+
eta = ", ETA: calculating..."
+
}
+
+
for _, err := range errors {
+
fmt.Printf("%v\n", err)
+
}
+
+
errors = nil
+
+
fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s",
+
processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta)
+
}
+
+
fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored)
+
+
return nil
+
}
+179
clickhouse_inserter/inserter.go
···
+
package clickhouse_inserter
+
+
import (
+
"context"
+
"log/slog"
+
"reflect"
+
"slices"
+
"sync"
+
"time"
+
+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+
"github.com/prometheus/client_golang/prometheus"
+
"github.com/prometheus/client_golang/prometheus/promauto"
+
"go.uber.org/ratelimit"
+
)
+
+
type Inserter struct {
+
conn driver.Conn
+
query string
+
mu sync.Mutex
+
queuedEvents []any
+
batchSize int
+
insertsCounter *prometheus.CounterVec
+
pendingSends prometheus.Gauge
+
histogram *prometheus.HistogramVec
+
logger *slog.Logger
+
prefix string
+
rateLimit ratelimit.Limiter
+
}
+
+
type Args struct {
+
Conn driver.Conn
+
Query string
+
BatchSize int
+
PrometheusCounterPrefix string
+
Logger *slog.Logger
+
Histogram *prometheus.HistogramVec
+
RateLimit int
+
}
+
+
func New(ctx context.Context, args *Args) (*Inserter, error) {
+
if args.Logger == nil {
+
args.Logger = slog.Default()
+
}
+
+
inserter := &Inserter{
+
conn: args.Conn,
+
query: args.Query,
+
mu: sync.Mutex{},
+
batchSize: args.BatchSize,
+
histogram: args.Histogram,
+
logger: args.Logger,
+
prefix: args.PrometheusCounterPrefix,
+
}
+
+
if args.RateLimit != 0 {
+
rateLimit := ratelimit.New(args.RateLimit)
+
inserter.rateLimit = rateLimit
+
}
+
+
if args.PrometheusCounterPrefix != "" {
+
inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
+
Name: "clickhouse_inserts",
+
Namespace: args.PrometheusCounterPrefix,
+
Help: "total inserts into clickhouse by status",
+
}, []string{"status"})
+
+
inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{
+
Name: "clickhouse_pending_sends",
+
Namespace: args.PrometheusCounterPrefix,
+
Help: "total clickhouse insertions that are in progress",
+
})
+
+
} else {
+
args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "query", args.Query)
+
}
+
+
return inserter, nil
+
}
+
+
func (i *Inserter) Insert(ctx context.Context, e any) error {
+
i.mu.Lock()
+
+
i.queuedEvents = append(i.queuedEvents, e)
+
+
var toInsert []any
+
if len(i.queuedEvents) >= i.batchSize {
+
toInsert = slices.Clone(i.queuedEvents)
+
i.queuedEvents = nil
+
}
+
+
i.mu.Unlock()
+
+
if len(toInsert) > 0 {
+
i.sendStream(ctx, toInsert)
+
}
+
+
return nil
+
}
+
+
func (i *Inserter) Close(ctx context.Context) error {
+
i.mu.Lock()
+
+
var toInsert []any
+
+
if len(i.queuedEvents) > 0 {
+
toInsert = slices.Clone(i.queuedEvents)
+
i.queuedEvents = nil
+
}
+
+
i.mu.Unlock()
+
+
if len(toInsert) > 0 {
+
i.sendStream(ctx, toInsert)
+
}
+
+
return nil
+
}
+
+
func (i *Inserter) sendStream(ctx context.Context, toInsert []any) {
+
if i.pendingSends != nil {
+
i.pendingSends.Inc()
+
defer i.pendingSends.Dec()
+
}
+
+
if i.histogram != nil {
+
start := time.Now()
+
defer func() {
+
i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds())
+
}()
+
}
+
+
if len(toInsert) == 0 {
+
return
+
}
+
+
status := "ok"
+
if i.insertsCounter != nil {
+
defer func() {
+
i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert)))
+
}()
+
}
+
+
batch, err := i.conn.PrepareBatch(ctx, i.query)
+
if err != nil {
+
i.logger.Error("error creating batch", "prefix", i.prefix, "error", err)
+
status = "failed"
+
return
+
}
+
+
for _, d := range toInsert {
+
var structPtr any
+
if reflect.TypeOf(d).Kind() == reflect.Ptr {
+
structPtr = d
+
} else {
+
v := reflect.ValueOf(d)
+
if v.CanAddr() {
+
structPtr = v.Addr().Addr().Interface()
+
} else {
+
ptr := reflect.New(v.Type())
+
ptr.Elem().Set(v)
+
structPtr = ptr.Interface()
+
}
+
}
+
+
if err := batch.AppendStruct(structPtr); err != nil {
+
i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err)
+
}
+
}
+
+
if i.rateLimit != nil {
+
i.rateLimit.Take()
+
}
+
+
if err := batch.Send(); err != nil {
+
status = "failed"
+
i.logger.Error("error sending batch", "prefix", i.prefix, "error", err)
+
}
+
}
+54 -5
cmd/photocopy/main.go
···
func main() {
app := cli.App{
-
Name: "photocopy",
-
Usage: "bigquery inserter for firehose events",
-
Action: run,
+
Name: "photocopy",
+
Usage: "bigquery inserter for firehose events",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "relay-host",
···
&cli.StringFlag{
Name: "plc-scraper-cursor-file",
EnvVars: []string{"PHOTOCOPY_PLC_SCRAPER_CURSOR_FILE"},
+
Required: true,
+
},
+
&cli.StringFlag{
+
Name: "clickhouse-addr",
+
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_ADDR"},
+
Required: true,
+
},
+
&cli.StringFlag{
+
Name: "clickhouse-database",
+
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_DATABASE"},
Required: true,
},
&cli.StringFlag{
···
EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_PASS"},
Required: true,
},
+
&cli.StringFlag{
+
Name: "ratelimit-bypass-key",
+
EnvVars: []string{"PHOTOCOPY_RATELIMIT_BYPASS_KEY"},
+
Required: false,
+
},
+
&cli.BoolFlag{
+
Name: "with-backfill",
+
},
+
&cli.StringFlag{
+
Name: "nervana-endpoint",
+
EnvVars: []string{"PHOTOCOPY_NERVANA_ENDPOINT"},
+
},
+
&cli.StringFlag{
+
Name: "nervana-api-key",
+
EnvVars: []string{"PHOTOCOPY_NERVANA_API_KEY"},
+
},
+
},
+
Commands: cli.Commands{
+
&cli.Command{
+
Name: "run",
+
Action: run,
+
},
+
&cli.Command{
+
Name: "fetch-repos",
+
Action: runFetchRepos,
+
},
},
ErrWriter: os.Stderr,
}
···
MetricsAddr: cmd.String("metrics-addr"),
CursorFile: cmd.String("cursor-file"),
PLCScraperCursorFile: cmd.String("plc-scraper-cursor-file"),
+
ClickhouseAddr: cmd.String("clickhouse-addr"),
+
ClickhouseDatabase: cmd.String("clickhouse-database"),
ClickhouseUser: cmd.String("clickhouse-user"),
ClickhousePass: cmd.String("clickhouse-pass"),
+
RatelimitBypassKey: cmd.String("ratelimit-bypass-key"),
+
NervanaEndpoint: cmd.String("nervana-endpoint"),
+
NervanaApiKey: cmd.String("nervana-api-key"),
})
if err != nil {
panic(err)
···
sig := <-exitSignals
l.Info("received os exit signal", "signal", sig)
+
cancel()
}()
-
if err := p.Run(ctx); err != nil {
-
return err
+
if err := p.Run(ctx, cmd.Bool("with-backfill")); err != nil {
+
panic(err)
}
return nil
}
+
+
var runFetchRepos = func(cmd *cli.Context) error {
+
ctx := cmd.Context
+
ctx, cancel := context.WithCancel(ctx)
+
defer cancel()
+
+
return nil
+
}
+1 -10
consumer.go
···
"net/http"
"net/url"
"os"
-
"strings"
"time"
"github.com/bluesky-social/indigo/api/atproto"
···
continue
}
-
if !strings.HasPrefix(collection.String(), "app.bsky.") {
-
continue
-
}
-
-
if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String()); err != nil {
+
if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String(), fmt.Sprintf("%d", evt.Seq)); err != nil {
p.logger.Error("error handling create event", "error", err)
continue
}
case repomgr.EvtKindDeleteRecord:
-
if !strings.HasPrefix(collection.String(), "app.bsky.") {
-
continue
-
}
-
if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil {
p.logger.Error("error handling delete event", "error", err)
continue
+12 -24
go.mod
···
go 1.24.4
require (
-
cloud.google.com/go/bigquery v1.69.0
+
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/bluesky-social/indigo v0.0.0-20250626183556-5641d3c27325
github.com/gorilla/websocket v1.5.1
github.com/ipfs/go-cid v0.5.0
+
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4
github.com/joho/godotenv v1.5.1
github.com/prometheus/client_golang v1.22.0
github.com/urfave/cli/v2 v2.25.7
+
go.uber.org/ratelimit v0.3.1
)
require (
-
cloud.google.com/go v0.121.0 // indirect
-
cloud.google.com/go/auth v0.16.1 // indirect
-
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
-
cloud.google.com/go/compute/metadata v0.6.0 // indirect
-
cloud.google.com/go/iam v1.5.2 // indirect
+
github.com/ClickHouse/ch-go v0.66.1 // indirect
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
-
github.com/apache/arrow/go/v15 v15.0.2 // indirect
+
github.com/andybalholm/brotli v1.1.1 // indirect
+
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
+
github.com/go-faster/city v1.0.1 // indirect
+
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
-
github.com/goccy/go-json v0.10.2 // indirect
github.com/gocql/gocql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
-
github.com/google/flatbuffers v23.5.26+incompatible // indirect
-
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
-
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
-
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
···
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-verifcid v0.0.3 // indirect
-
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/ipld/go-ipld-prime v0.21.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
···
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
+
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
+
github.com/segmentio/asm v1.2.0 // indirect
+
github.com/shopspring/decimal v1.4.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
-
github.com/zeebo/xxh3 v1.0.2 // indirect
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
-
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
-
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
-
golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
-
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/time v0.11.0 // indirect
-
golang.org/x/tools v0.33.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
-
google.golang.org/api v0.232.0 // indirect
-
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb // indirect
-
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 // indirect
-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
-
google.golang.org/grpc v1.72.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
+
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.7 // indirect
gorm.io/gorm v1.30.0 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
+43 -81
go.sum
···
-
cel.dev/expr v0.20.0 h1:OunBvVCfvpWlt4dN7zg3FM6TDkzOePe1+foGJ9AXeeI=
-
cel.dev/expr v0.20.0/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
-
cloud.google.com/go v0.121.0 h1:pgfwva8nGw7vivjZiRfrmglGWiCJBP+0OmDpenG/Fwg=
-
cloud.google.com/go v0.121.0/go.mod h1:rS7Kytwheu/y9buoDmu5EIpMMCI4Mb8ND4aeN4Vwj7Q=
-
cloud.google.com/go/auth v0.16.1 h1:XrXauHMd30LhQYVRHLGvJiYeczweKQXZxsTbV9TiguU=
-
cloud.google.com/go/auth v0.16.1/go.mod h1:1howDHJ5IETh/LwYs3ZxvlkXF48aSqqJUM+5o02dNOI=
-
cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc=
-
cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c=
-
cloud.google.com/go/bigquery v1.69.0 h1:rZvHnjSUs5sHK3F9awiuFk2PeOaB8suqNuim21GbaTc=
-
cloud.google.com/go/bigquery v1.69.0/go.mod h1:TdGLquA3h/mGg+McX+GsqG9afAzTAcldMjqhdjHTLew=
-
cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I=
-
cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg=
-
cloud.google.com/go/datacatalog v1.26.0 h1:eFgygb3DTufTWWUB8ARk+dSuXz+aefNJXTlkWlQcWwE=
-
cloud.google.com/go/datacatalog v1.26.0/go.mod h1:bLN2HLBAwB3kLTFT5ZKLHVPj/weNz6bR0c7nYp0LE14=
-
cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8=
-
cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE=
-
cloud.google.com/go/longrunning v0.6.7 h1:IGtfDWHhQCgCjwQjV9iiLnUta9LBCo8R9QmAFsS/PrE=
-
cloud.google.com/go/longrunning v0.6.7/go.mod h1:EAFV3IZAKmM56TyiE6VAP3VoTzhZzySwI/YI1s/nRsY=
-
cloud.google.com/go/monitoring v1.24.0 h1:csSKiCJ+WVRgNkRzzz3BPoGjFhjPY23ZTcaenToJxMM=
-
cloud.google.com/go/monitoring v1.24.0/go.mod h1:Bd1PRK5bmQBQNnuGwHBfUamAV1ys9049oEPHnn4pcsc=
-
cloud.google.com/go/storage v1.53.0 h1:gg0ERZwL17pJ+Cz3cD2qS60w1WMDnwcm5YPAIQBHUAw=
-
cloud.google.com/go/storage v1.53.0/go.mod h1:7/eO2a/srr9ImZW9k5uufcNahT2+fPb8w5it1i5boaA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 h1:ErKg/3iS1AKcTkf3yixlZ54f9U1rljCkQyEXWUnIUxc=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0/go.mod h1:yAZHSGnqScoU556rBOVkwLze6WP5N+U11RHuWaGVxwY=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 h1:fYE9p3esPxA/C0rQ0AHhP0drtPXDRhaWiwg1DPqO7IU=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0/go.mod h1:BnBReJLvVYx2CS/UHOgVz2BXKXD9wsQPxZug20nZhd0=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI=
-
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
+
github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4=
+
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
+
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
+
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU=
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 h1:iW0a5ljuFxkLGPNem5Ui+KBjFJzKg4Fv2fnxe4dvzpM=
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5/go.mod h1:Y2QMoi1vgtOIfc+6DhrMOGkLoGzqSV2rKp4Sm+opsyA=
-
github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE=
-
github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA=
+
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
+
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
···
github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk=
-
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
···
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
-
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
-
github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
-
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
-
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
-
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
-
github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E=
-
github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc=
+
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
+
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
+
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
+
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
···
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
-
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
-
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
-
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
-
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-
github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg=
-
github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
-
github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc=
-
github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
-
github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
-
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-
github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4=
-
github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA=
-
github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrkurSS/Q=
-
github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
···
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
···
github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
+
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE=
···
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
+
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
+
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
+
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk=
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw=
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
-
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0=
···
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
+
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
+
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
+
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
+
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
···
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
-
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
-
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
···
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ=
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e h1:28X54ciEwwUxyHn9yrZfl5ojgF4CBNLWX7LR0rvBkf4=
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so=
+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
+
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
+
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
+
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
-
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
-
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
-
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
-
github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
-
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
-
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b h1:CzigHMRySiX3drau9C6Q5CAbNIApmLdat5jPMqChvDA=
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b/go.mod h1:/y/V339mxv2sZmYYR64O07VuCpdNZqCTwO8ZcouTMI8=
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 h1:qwDnMxjkyLmAFgcfgTnfJrmYKWhHnci3GjDqcZp1M3Q=
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02/go.mod h1:JTnUj0mpYiAsuZLmKjTx/ex3AtMowcCgnE7YNyCEP0I=
+
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
-
go.opentelemetry.io/contrib/detectors/gcp v1.35.0 h1:bGvFt68+KTiAKFlacHW6AhA56GF2rS0bdD3aJYEnmzA=
-
go.opentelemetry.io/contrib/detectors/gcp v1.35.0/go.mod h1:qGWP8/+ILwMRIUf9uIVLloR1uo5ZYAslM4O6OqUi1DA=
-
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0 h1:x7wzEgXfnzJcHDwStJT+mxOz4etr2EcexjqhBvmoakw=
-
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.60.0/go.mod h1:rg+RlpR5dKwaS95IyyZqj5Wd4E13lk/msnTS0Xl9lJM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
···
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
+
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
+
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
···
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
···
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
-
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
-
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
···
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
···
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
···
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
-
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
-
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
-
google.golang.org/api v0.232.0 h1:qGnmaIMf7KcuwHOlF3mERVzChloDYwRfOJOrHt8YC3I=
-
google.golang.org/api v0.232.0/go.mod h1:p9QCfBWZk1IJETUdbTKloR5ToFdKbYh2fkjsUL6vNoY=
-
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE=
-
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE=
-
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 h1:vPV0tzlsK6EzEDHNNH5sa7Hs9bd7iXR7B1tSiPepkV0=
-
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:pKLAc5OolXC3ViWGI62vvC0n10CpwAtRcTNCFwTKBEw=
-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 h1:IqsN8hx+lWLqlN+Sc3DoMy/watjofWiU8sRFgQ8fhKM=
-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
-
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
-
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
+
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+241 -130
handle_create.go
···
package photocopy
import (
+
"bytes"
"context"
"fmt"
+
"slices"
+
"strings"
"time"
"github.com/araddon/dateparse"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/haileyok/photocopy/models"
)
-
func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid string) error {
-
_, err := dateparse.ParseAny(indexedAt)
+
func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
+
iat, err := dateparse.ParseAny(indexedAt)
if err != nil {
return err
+
}
+
+
if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
+
p.logger.Error("error creating record", "error", err)
}
switch collection {
+
case "app.bsky.feed.post":
+
return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
+
case "app.bsky.graph.follow":
+
return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
+
case "app.bsky.feed.like", "app.bsky.feed.repost":
+
return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
+
default:
+
return nil
+
}
+
}
+
+
func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
+
var cat time.Time
+
prkey, err := syntax.ParseTID(rkey)
+
if err == nil {
+
cat = prkey.Time()
+
} else {
+
cat = time.Now()
+
}
+
+
rec := models.Record{
+
Did: did,
+
Rkey: rkey,
+
Collection: collection,
+
Cid: cid,
+
Seq: seq,
+
Raw: string(raw),
+
CreatedAt: cat,
+
}
+
+
if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
+
return err
}
return nil
}
-
// func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
-
// var rec bsky.FeedPost
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// jb, err := json.Marshal(rec)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// bqrec := &BigQueryRecord{
-
// Uri: uri,
-
// AuthorDid: did,
-
// Collection: collection,
-
// Rkey: rkey,
-
// CreatedAt: *cat,
-
// IndexedAt: indexedAt,
-
// Raw: recb,
-
// Json: string(jb),
-
// Cid: cid,
-
// Rev: rev,
-
// }
-
//
-
// if rec.Reply != nil {
-
// if rec.Reply.Parent != nil {
-
// bqrec.ReplyToUri = rec.Reply.Parent.Uri
-
// }
-
// if rec.Reply.Root != nil {
-
// bqrec.InThreadUri = rec.Reply.Root.Uri
-
// }
-
// }
-
//
-
// if err := p.inserters.genericInserter.Insert(ctx, bqrec); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
+
func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
+
var rec bsky.FeedPost
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
lang := ""
+
if len(rec.Langs) != 0 {
+
lang = rec.Langs[0]
+
}
+
+
post := models.Post{
+
Uri: uri,
+
Rkey: rkey,
+
CreatedAt: *cat,
+
IndexedAt: indexedAt,
+
Did: did,
+
Lang: lang,
+
Text: rec.Text,
+
}
+
+
if rec.Reply != nil {
+
if rec.Reply.Parent != nil {
+
aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.ParentDid = aturi.Authority().String()
+
post.ParentUri = rec.Reply.Parent.Uri
+
}
+
if rec.Reply.Root != nil {
+
aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.RootDid = aturi.Authority().String()
+
post.RootUri = rec.Reply.Root.Uri
+
}
+
}
+
+
if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
+
aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.QuoteDid = aturi.Authority().String()
+
post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
+
} else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
+
aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.QuoteDid = aturi.Authority().String()
+
post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
+
}
+
+
if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
+
return err
+
}
+
+
isEn := slices.Contains(rec.Langs, "en")
+
if rec.Text != "" && rec.Reply == nil && isEn && p.nervanaClient != nil {
+
go func(ctx context.Context, rec bsky.FeedPost, did, rkey string) {
+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+
defer cancel()
+
+
nervanaItems, err := p.nervanaClient.MakeRequest(ctx, rec.Text)
+
if err != nil {
+
p.logger.Error("error making nervana items request", "error", err)
+
return
+
}
+
+
for _, ni := range nervanaItems {
+
postLabel := models.PostLabel{
+
Did: did,
+
Rkey: rkey,
+
Text: ni.Text,
+
Label: ni.Label,
+
EntityId: ni.EntityId,
+
Description: ni.Description,
+
Topic: "",
+
CreatedAt: time.Now(),
+
}
+
p.inserters.labelsInserter.Insert(ctx, postLabel)
+
}
+
}(ctx, rec, did, rkey)
+
}
+
+
return nil
+
}
+
+
func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
+
var rec bsky.GraphFollow
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
follow := models.Follow{
+
Uri: uri,
+
Did: did,
+
Rkey: rkey,
+
CreatedAt: *cat,
+
IndexedAt: indexedAt,
+
Subject: rec.Subject,
+
}
+
+
if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
+
return err
+
}
+
+
return nil
+
}
+
+
func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
+
colPts := strings.Split(collection, ".")
+
if len(colPts) < 4 {
+
return fmt.Errorf("invalid collection type %s", collection)
+
}
+
+
interaction := models.Interaction{
+
Uri: uri,
+
Kind: colPts[3],
+
Rkey: rkey,
+
IndexedAt: indexedAt,
+
Did: did,
+
SubjectUri: uri,
+
SubjectDid: did,
+
}
+
+
switch collection {
+
case "app.bsky.feed.like":
+
var rec bsky.FeedLike
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
if rec.Subject == nil {
+
return fmt.Errorf("invalid subject in like")
+
}
+
+
aturi, err := syntax.ParseATURI(rec.Subject.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
+
interaction.SubjectDid = aturi.Authority().String()
+
interaction.SubjectUri = rec.Subject.Uri
+
interaction.CreatedAt = *cat
+
case "app.bsky.feed.repost":
+
var rec bsky.FeedRepost
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
if rec.Subject == nil {
+
return fmt.Errorf("invalid subject in repost")
+
}
+
+
aturi, err := syntax.ParseATURI(rec.Subject.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
+
interaction.SubjectDid = aturi.Authority().String()
+
interaction.SubjectUri = rec.Subject.Uri
+
interaction.CreatedAt = *cat
+
}
+
+
if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
+
return err
+
}
-
// func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
-
// var rec bsky.GraphFollow
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// bqrec := &BigQueryFollow{
-
// Uri: uri,
-
// AuthorDid: did,
-
// Rkey: rkey,
-
// CreatedAt: *cat,
-
// IndexedAt: indexedAt,
-
// SubjectDid: rec.Subject,
-
// }
-
//
-
// if err := p.inserters.followsInserter.Insert(ctx, bqrec); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
-
//
-
// func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
-
// colPts := strings.Split(collection, ".")
-
// if len(colPts) < 4 {
-
// return fmt.Errorf("invalid collection type %s", collection)
-
// }
-
//
-
// bqi := &BigQueryInteraction{
-
// Uri: uri,
-
// Kind: colPts[3],
-
// AuthorDid: did,
-
// Rkey: rkey,
-
// IndexedAt: indexedAt,
-
// }
-
//
-
// switch collection {
-
// case "app.bsky.feed.like":
-
// var rec bsky.FeedLike
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// if rec.Subject == nil {
-
// return fmt.Errorf("invalid subject in like")
-
// }
-
//
-
// bqi.SubjectUri = rec.Subject.Uri
-
// bqi.CreatedAt = *cat
-
// case "app.bsky.feed.repost":
-
// var rec bsky.FeedRepost
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// if rec.Subject == nil {
-
// return fmt.Errorf("invalid subject in repost")
-
// }
-
//
-
// bqi.SubjectUri = rec.Subject.Uri
-
// bqi.CreatedAt = *cat
-
// }
-
//
-
// if err := p.inserters.interactionsInserter.Insert(ctx, bqi); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
+
return nil
+
}
func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
var rkeyTime time.Time
+13
handle_delete.go
···
import (
"context"
+
"time"
+
+
"github.com/haileyok/photocopy/models"
)
func (p *Photocopy) handleDelete(ctx context.Context, did, collection, rkey string) error {
+
del := models.Delete{
+
Did: did,
+
Rkey: rkey,
+
CreatedAt: time.Now(),
+
}
+
+
if err := p.inserters.deletesInserter.Insert(ctx, del); err != nil {
+
return err
+
}
+
return nil
}
-131
internal/inserter.go
···
-
package inserter
-
-
import (
-
"context"
-
"database/sql/driver"
-
"log/slog"
-
"slices"
-
"sync"
-
"time"
-
-
"github.com/prometheus/client_golang/prometheus"
-
"github.com/prometheus/client_golang/prometheus/promauto"
-
)
-
-
type Inserter struct {
-
db driver.Conn
-
mu sync.Mutex
-
queuedEvents []any
-
batchSize int
-
insertsCounter *prometheus.CounterVec
-
pendingSends prometheus.Gauge
-
histogram *prometheus.HistogramVec
-
logger *slog.Logger
-
prefix string
-
}
-
-
type BaseArgs struct {
-
CredentialsPath string
-
ProjectID string
-
DatasetID string
-
TableID string
-
}
-
-
type Args struct {
-
BaseArgs
-
BatchSize int
-
PrometheusCounterPrefix string
-
Logger *slog.Logger
-
Histogram *prometheus.HistogramVec
-
}
-
-
func New(ctx context.Context, args *Args) (*Inserter, error) {
-
if args.Logger == nil {
-
args.Logger = slog.Default()
-
}
-
-
// dataset := bqc.Dataset(args.DatasetID)
-
// table := dataset.Table(args.TableID)
-
-
inserter := &Inserter{}
-
-
if args.PrometheusCounterPrefix != "" {
-
inserter.insertsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
-
Name: "clickhouse_inserts",
-
Namespace: args.PrometheusCounterPrefix,
-
Help: "total inserts into clickhouse by status",
-
}, []string{"status"})
-
-
inserter.pendingSends = promauto.NewGauge(prometheus.GaugeOpts{
-
Name: "clickhouse_pending_sends",
-
Namespace: args.PrometheusCounterPrefix,
-
Help: "total clickhouse insertions that are in progress",
-
})
-
-
} else {
-
args.Logger.Info("no prometheus prefix provided, no metrics will be registered for this counter", "dataset", args.DatasetID, "table", args.TableID)
-
}
-
-
return inserter, nil
-
}
-
-
func (i *Inserter) Insert(ctx context.Context, e any) error {
-
i.mu.Lock()
-
-
i.queuedEvents = append(i.queuedEvents, e)
-
-
var toInsert []any
-
if len(i.queuedEvents) >= i.batchSize {
-
toInsert = slices.Clone(i.queuedEvents)
-
i.queuedEvents = nil
-
}
-
-
i.mu.Unlock()
-
-
if len(toInsert) > 0 {
-
i.sendStream(ctx, toInsert)
-
}
-
-
return nil
-
}
-
-
func (i *Inserter) Close(ctx context.Context) error {
-
i.mu.Lock()
-
-
var toInsert []any
-
-
if len(i.queuedEvents) > 0 {
-
toInsert = slices.Clone(i.queuedEvents)
-
i.queuedEvents = nil
-
}
-
-
i.mu.Unlock()
-
-
if len(toInsert) > 0 {
-
i.sendStream(ctx, toInsert)
-
}
-
-
return nil
-
}
-
-
func (i *Inserter) sendStream(ctx context.Context, toInsert []any) {
-
i.pendingSends.Inc()
-
defer i.pendingSends.Dec()
-
-
if i.histogram != nil {
-
start := time.Now()
-
defer func() {
-
i.histogram.WithLabelValues(i.prefix).Observe(time.Since(start).Seconds())
-
}()
-
}
-
-
if len(toInsert) == 0 {
-
return
-
}
-
-
status := "ok"
-
-
// TODO: do the insert
-
-
i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert)))
-
}
+9
models/delete.go
···
+
package models
+
+
import "time"
+
+
type Delete struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
}
+12
models/follow.go
···
+
package models
+
+
import "time"
+
+
type Follow struct {
+
Uri string `ch:"uri"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
Subject string `ch:"subject"`
+
}
+14
models/interaction.go
···
+
package models
+
+
import "time"
+
+
type Interaction struct {
+
Uri string `ch:"uri"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
Kind string `ch:"kind"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
SubjectUri string `ch:"subject_uri"`
+
SubjectDid string `ch:"subject_did"`
+
}
+19
models/post.go
···
+
package models
+
+
import "time"
+
+
type Post struct {
+
Uri string `ch:"uri"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
RootUri string `ch:"root_uri"`
+
RootDid string `ch:"root_did"`
+
ParentUri string `ch:"parent_uri"`
+
ParentDid string `ch:"parent_did"`
+
QuoteUri string `ch:"quote_uri"`
+
QuoteDid string `ch:"quote_did"`
+
Lang string `ch:"lang"`
+
Text string `ch:"text"`
+
}
+14
models/post_label.go
···
+
package models
+
+
import "time"
+
+
type PostLabel struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
Text string `ch:"text"`
+
Label string `ch:"label"`
+
EntityId string `ch:"entity_id"`
+
Description string `ch:"description"`
+
Topic string `ch:"topic"`
+
}
+13
models/record.go
···
+
package models
+
+
import "time"
+
+
type Record struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
Collection string `ch:"collection"`
+
Cid string `ch:"cid"`
+
Seq string `ch:"seq"`
+
Raw string `ch:"raw"`
+
CreatedAt time.Time `ch:"created_at"`
+
}
-1
models.go
···
-
package photocopy
+77
nervana/client.go
···
+
package nervana
+
+
import (
+
"bytes"
+
"context"
+
"encoding/json"
+
"fmt"
+
"io"
+
"net/http"
+
"time"
+
)
+
+
type Client struct {
+
cli *http.Client
+
endpoint string
+
apiKey string
+
}
+
+
func NewClient(endpoint string, apiKey string) *Client {
+
return &Client{
+
cli: &http.Client{
+
Timeout: 5 * time.Second,
+
},
+
endpoint: endpoint,
+
apiKey: apiKey,
+
}
+
}
+
+
type NervanaItem struct {
+
Text string `json:"text"`
+
Label string `json:"label"`
+
EntityId string `json:"entityId"`
+
Description string `json:"description"`
+
}
+
+
func (c *Client) newRequest(ctx context.Context, text string) (*http.Request, error) {
+
payload := map[string]string{
+
"text": text,
+
"language": "en",
+
}
+
+
b, err := json.Marshal(payload)
+
if err != nil {
+
return nil, err
+
}
+
+
req, err := http.NewRequestWithContext(ctx, "GET", c.endpoint, bytes.NewReader(b))
+
+
req.Header.Set("Authorization", "Bearer "+c.apiKey)
+
+
return req, err
+
}
+
+
func (c *Client) MakeRequest(ctx context.Context, text string) ([]NervanaItem, error) {
+
req, err := c.newRequest(ctx, text)
+
if err != nil {
+
return nil, err
+
}
+
+
resp, err := c.cli.Do(req)
+
if err != nil {
+
return nil, err
+
}
+
defer resp.Body.Close()
+
+
if resp.StatusCode != 200 {
+
io.Copy(io.Discard, resp.Body)
+
return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode)
+
}
+
+
var nervanaResp []NervanaItem
+
if err := json.NewDecoder(resp.Body).Decode(&nervanaResp); err != nil {
+
return nil, err
+
}
+
+
return nervanaResp, nil
+
}
+188 -12
photocopy.go
···
"sync"
"time"
-
inserter "github.com/haileyok/photocopy/internal"
+
"github.com/ClickHouse/clickhouse-go/v2"
+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+
"github.com/haileyok/photocopy/clickhouse_inserter"
+
"github.com/haileyok/photocopy/nervana"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
···
inserters *Inserters
plcScraper *PLCScraper
+
+
ratelimitBypassKey string
+
+
conn driver.Conn
+
+
nervanaClient *nervana.Client
+
nervanaEndpoint string
+
nervanaApiKey string
}
type Inserters struct {
-
followsInserter *inserter.Inserter
-
plcInserter *inserter.Inserter
+
followsInserter *clickhouse_inserter.Inserter
+
interactionsInserter *clickhouse_inserter.Inserter
+
postsInserter *clickhouse_inserter.Inserter
+
plcInserter *clickhouse_inserter.Inserter
+
recordsInserter *clickhouse_inserter.Inserter
+
deletesInserter *clickhouse_inserter.Inserter
+
labelsInserter *clickhouse_inserter.Inserter
}
type Args struct {
···
MetricsAddr string
CursorFile string
PLCScraperCursorFile string
+
ClickhouseAddr string
+
ClickhouseDatabase string
ClickhouseUser string
ClickhousePass string
+
RatelimitBypassKey string
+
NervanaEndpoint string
+
NervanaApiKey string
}
func New(ctx context.Context, args *Args) (*Photocopy, error) {
+
conn, err := clickhouse.Open(&clickhouse.Options{
+
Addr: []string{args.ClickhouseAddr},
+
Auth: clickhouse.Auth{
+
Database: args.ClickhouseDatabase,
+
Username: args.ClickhouseUser,
+
Password: args.ClickhousePass,
+
},
+
})
+
if err != nil {
+
return nil, err
+
}
+
p := &Photocopy{
-
logger: args.Logger,
-
metricsAddr: args.MetricsAddr,
-
relayHost: args.RelayHost,
-
wg: sync.WaitGroup{},
-
cursorFile: args.CursorFile,
+
logger: args.Logger,
+
metricsAddr: args.MetricsAddr,
+
relayHost: args.RelayHost,
+
wg: sync.WaitGroup{},
+
cursorFile: args.CursorFile,
+
ratelimitBypassKey: args.RatelimitBypassKey,
+
conn: conn,
}
insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{
···
Buckets: prometheus.ExponentialBucketsRange(0.0001, 30, 20),
}, []string{"type"})
-
fi, err := inserter.New(ctx, &inserter.Args{
+
fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_follows",
Histogram: insertionsHist,
+
BatchSize: 500,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_posts",
+
Histogram: insertionsHist,
+
BatchSize: 300,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_interactions",
+
Histogram: insertionsHist,
+
BatchSize: 1000,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_records",
+
Histogram: insertionsHist,
+
BatchSize: 2500,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_deletes",
+
Histogram: insertionsHist,
+
BatchSize: 500,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO delete (did, rkey, created_at)",
+
RateLimit: 3,
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
li, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_labels",
+
Histogram: insertionsHist,
BatchSize: 100,
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO post_label (did, rkey, text, label, entity_id, description, topic, created_at)",
+
RateLimit: 3,
})
if err != nil {
return nil, err
}
is := &Inserters{
-
followsInserter: fi,
+
followsInserter: fi,
+
postsInserter: pi,
+
interactionsInserter: ii,
+
recordsInserter: ri,
+
deletesInserter: di,
+
labelsInserter: li,
}
p.inserters = is
-
plci, err := inserter.New(ctx, &inserter.Args{
+
plci, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_plc_entries",
Histogram: insertionsHist,
BatchSize: 100,
Logger: args.Logger,
+
Conn: conn,
+
Query: `INSERT INTO plc (
+
did, cid, nullified, created_at, plc_op_sig, plc_op_prev, plc_op_type,
+
plc_op_services, plc_op_also_known_as, plc_op_rotation_keys,
+
plc_tomb_sig, plc_tomb_prev, plc_tomb_type,
+
legacy_op_sig, legacy_op_prev, legacy_op_type, legacy_op_handle,
+
legacy_op_service, legacy_op_signing_key, legacy_op_recovery_key
+
)`,
})
if err != nil {
return nil, err
···
p.inserters.plcInserter = plci
p.plcScraper = plcs
+
if args.NervanaApiKey != "" && args.NervanaEndpoint != "" {
+
p.nervanaClient = nervana.NewClient(args.NervanaEndpoint, args.NervanaApiKey)
+
}
+
return p, nil
}
-
func (p *Photocopy) Run(baseCtx context.Context) error {
+
func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error {
ctx, cancel := context.WithCancel(baseCtx)
metricsServer := http.NewServeMux()
···
}
}(ctx)
+
if withBackfill {
+
go func(ctx context.Context) {
+
if err := p.runBackfiller(ctx); err != nil {
+
panic(fmt.Errorf("error starting backfiller: %w", err))
+
}
+
}(ctx)
+
}
+
<-ctx.Done()
if p.inserters != nil {
···
return
}
p.logger.Info("follows inserter closed")
+
}()
+
}
+
+
if p.inserters.interactionsInserter != nil {
+
p.wg.Add(1)
+
go func() {
+
defer p.wg.Done()
+
if err := p.inserters.interactionsInserter.Close(ctx); err != nil {
+
p.logger.Error("failed to close interactions inserter", "error", err)
+
return
+
}
+
p.logger.Info("interactions inserter closed")
+
}()
+
}
+
+
if p.inserters.postsInserter != nil {
+
p.wg.Add(1)
+
go func() {
+
defer p.wg.Done()
+
if err := p.inserters.postsInserter.Close(ctx); err != nil {
+
p.logger.Error("failed to close posts inserter", "error", err)
+
return
+
}
+
p.logger.Info("posts inserter closed")
+
}()
+
}
+
+
if p.inserters.recordsInserter != nil {
+
p.wg.Add(1)
+
go func() {
+
defer p.wg.Done()
+
if err := p.inserters.recordsInserter.Close(ctx); err != nil {
+
p.logger.Error("failed to close records inserter", "error", err)
+
return
+
}
+
p.logger.Info("records inserter closed")
+
}()
+
}
+
+
if p.inserters.deletesInserter != nil {
+
p.wg.Add(1)
+
go func() {
+
defer p.wg.Done()
+
if err := p.inserters.deletesInserter.Close(ctx); err != nil {
+
p.logger.Error("failed to close deletes inserter", "error", err)
+
return
+
}
+
p.logger.Info("deletes inserter closed")
}()
}
+78 -58
plc_models.go
···
import (
"encoding/json"
-
"errors"
"fmt"
"time"
-
-
"cloud.google.com/go/bigquery"
)
type PLCEntry struct {
-
Did string `json:"did" bigquery:"did"`
-
Operation PLCOperationType `json:"operation" bigquery:"operation"`
-
Cid string `json:"cid" bigquery:"cid"`
-
Nullified bool `json:"nullified" bigquery:"nullified"`
-
CreatedAt time.Time `json:"createdAt" bigquery:"created_at"`
+
Did string `json:"did"`
+
Operation PLCOperationType `json:"operation"`
+
Cid string `json:"cid"`
+
Nullified bool `json:"nullified"`
+
CreatedAt time.Time `json:"createdAt"`
}
type PLCOperation struct {
-
Sig string `json:"sig" bigquery:"sig"`
-
Prev bigquery.NullString `json:"prev" bigquery:"prev"`
-
Type string `json:"type" bigquery:"type"`
-
Services map[string]PLCService `json:"services" bigquery:"-"`
-
ServicesJSON string `json:"-" bigquery:"services"`
-
AlsoKnownAs []string `json:"alsoKnownAs" bigquery:"also_known_as"`
-
RotationKeys []string `json:"rotationKeys" bigquery:"rotation_keys"`
-
VerificationMethods map[string]string `json:"verificationMethods" bigquery:"-"`
-
VerificationMethodsJSON string `json:"-" bigquery:"verification_methods"`
+
Sig string `json:"sig" `
+
Prev *string `json:"prev"`
+
Type string `json:"type"`
+
Services map[string]PLCService `json:"services"`
+
AlsoKnownAs []string `json:"alsoKnownAs"`
+
RotationKeys []string `json:"rotationKeys"`
+
VerificationMethods map[string]string `json:"verificationMethods"`
}
type PLCTombstone struct {
···
}
type PLCOperationType struct {
-
OperationType string `json:"-" bigquery:"operation_type"`
-
PLCOperation *PLCOperation `json:"-" bigquery:"plc_operation"`
-
PLCTombstone *PLCTombstone `json:"-" bigquery:"plc_tombstone"`
-
LegacyPLCOperation *LegacyPLCOperation `json:"-" bigquery:"legacy_plc_operation"`
+
OperationType string
+
PLCOperation *PLCOperation
+
PLCTombstone *PLCTombstone
+
LegacyPLCOperation *LegacyPLCOperation
+
}
+
+
type ClickhousePLCEntry struct {
+
Did string `ch:"did"`
+
Cid string `ch:"cid"`
+
Nullified bool `ch:"nullified"`
+
CreatedAt time.Time `ch:"created_at"`
+
PlcOpSig string `ch:"plc_op_sig"`
+
PlcOpPrev string `ch:"plc_op_prev"`
+
PlcOpType string `ch:"plc_op_type"`
+
PlcOpServices []string `ch:"plc_op_services"`
+
PlcOpAlsoKnownAs []string `ch:"plc_op_also_known_as"`
+
PlcOpRotationKeys []string `ch:"plc_op_rotation_keys"`
+
PlcTombSig string `ch:"plc_tomb_sig"`
+
PlcTombPrev string `ch:"plc_tomb_prev"`
+
PlcTombType string `ch:"plc_tomb_type"`
+
LegacyOpSig string `ch:"legacy_op_sig"`
+
LegacyOpPrev string `ch:"legacy_op_prev"`
+
LegacyOpType string `ch:"legacy_op_type"`
+
LegacyOpHandle string `ch:"legacy_op_handle"`
+
LegacyOpService string `ch:"legacy_op_service"`
+
LegacyOpSigningKey string `ch:"legacy_op_signing_key"`
+
LegacyOpRecoveryKey string `ch:"legacy_op_recovery_key"`
}
func (o *PLCOperationType) UnmarshalJSON(data []byte) error {
···
return nil
}
-
func (o *PLCOperationType) MarshalJSON() ([]byte, error) {
-
if o.PLCOperation != nil {
-
return json.Marshal(o.PLCOperation)
-
}
-
if o.PLCTombstone != nil {
-
return json.Marshal(o.PLCTombstone)
-
}
-
if o.LegacyPLCOperation != nil {
-
return json.Marshal(o.LegacyPLCOperation)
-
}
-
return nil, errors.New("no valid operation type found")
-
}
-
-
func (o *PLCOperationType) Value() (any, error) {
-
return json.Marshal(o)
-
}
-
-
func (o *PLCOperationType) Scan(value any) error {
-
bytes, ok := value.([]byte)
-
if !ok {
-
return errors.New("could not scan PLCOperationType")
+
func (e *PLCEntry) prepareForClickhouse() (*ClickhousePLCEntry, error) {
+
che := &ClickhousePLCEntry{
+
Did: e.Did,
+
Cid: e.Cid,
+
Nullified: e.Nullified,
+
CreatedAt: e.CreatedAt,
}
-
return json.Unmarshal(bytes, o)
-
}
-
-
func (e *PLCEntry) prepareForBigQuery() (map[string]bigquery.Value, string, error) {
if e.Operation.PLCOperation != nil {
-
if e.Operation.PLCOperation.Services != nil {
-
b, err := json.Marshal(e.Operation.PLCOperation.Services)
-
if err != nil {
-
return nil, "", fmt.Errorf("error marshaling services: %w", err)
-
}
-
e.Operation.PLCOperation.ServicesJSON = string(b)
+
pop := e.Operation.PLCOperation
+
che.PlcOpSig = pop.Sig
+
if pop.Prev != nil {
+
che.PlcOpPrev = *pop.Prev
}
-
if e.Operation.PLCOperation.VerificationMethods != nil {
-
b, err := json.Marshal(e.Operation.PLCOperation.VerificationMethods)
-
if err != nil {
-
return nil, "", fmt.Errorf("error marshaling verification methods: %w", err)
+
che.PlcOpType = pop.Type
+
services := []string{}
+
for _, s := range pop.Services {
+
services = append(services, s.Endpoint)
+
}
+
che.PlcOpServices = services
+
che.PlcOpAlsoKnownAs = pop.AlsoKnownAs
+
che.PlcOpRotationKeys = pop.RotationKeys
+
if e.Operation.PLCOperation.Services == nil {
+
for _, s := range e.Operation.PLCOperation.Services {
+
che.PlcOpServices = append(che.PlcOpServices, s.Endpoint)
}
-
e.Operation.PLCOperation.VerificationMethodsJSON = string(b)
}
+
return che, nil
+
} else if e.Operation.PLCTombstone != nil {
+
che.PlcTombSig = e.Operation.PLCTombstone.Sig
+
che.PlcTombPrev = e.Operation.PLCTombstone.Prev
+
che.PlcTombType = e.Operation.PLCTombstone.Type
+
return che, nil
+
} else if e.Operation.LegacyPLCOperation != nil {
+
lop := e.Operation.LegacyPLCOperation
+
che.LegacyOpSig = lop.Sig
+
che.LegacyOpPrev = lop.Prev
+
che.LegacyOpType = lop.Type
+
che.LegacyOpService = lop.Service
+
che.LegacyOpHandle = lop.Handle
+
che.LegacyOpSigningKey = lop.SigningKey
+
che.LegacyOpRecoveryKey = lop.RecoveryKey
+
return che, nil
}
-
return nil, "", nil
+
+
return nil, fmt.Errorf("no valid plc operation type")
}
+10 -6
plc_scraper.go
···
"strings"
"time"
-
inserter "github.com/haileyok/photocopy/internal"
+
"github.com/haileyok/photocopy/clickhouse_inserter"
)
type PLCScraper struct {
···
logger *slog.Logger
cursor string
cursorFile string
-
inserter *inserter.Inserter
+
inserter *clickhouse_inserter.Inserter
}
type PLCScraperArgs struct {
Logger *slog.Logger
-
Inserter *inserter.Inserter
+
Inserter *clickhouse_inserter.Inserter
CursorFile string
}
···
ustr += "&after=" + s.cursor
t, _ := time.Parse(time.RFC3339Nano, s.cursor)
if time.Since(t) > 1*time.Hour {
-
setTickerDuration(600 * time.Millisecond)
+
setTickerDuration(800 * time.Millisecond)
} else {
setTickerDuration(3 * time.Second)
}
···
s.saveCursor(s.cursor)
}
-
entry.prepareForBigQuery()
+
chEntry, err := entry.prepareForClickhouse()
+
if err != nil {
+
s.logger.Error("error getting clickhouse entry from plc entry", "error", err)
+
continue
+
}
-
s.inserter.Insert(ctx, entry)
+
s.inserter.Insert(ctx, *chEntry)
}
}