this repo has no description

feat: insert all records

+1 -10
consumer.go
···
"net/http"
"net/url"
"os"
-
"strings"
"time"
"github.com/bluesky-social/indigo/api/atproto"
···
continue
}
-
if !strings.HasPrefix(collection.String(), "app.bsky.") {
-
continue
-
}
-
-
if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String()); err != nil {
+
if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String(), fmt.Sprintf("%d", evt.Seq)); err != nil {
p.logger.Error("error handling create event", "error", err)
continue
}
case repomgr.EvtKindDeleteRecord:
-
if !strings.HasPrefix(collection.String(), "app.bsky.") {
-
continue
-
}
-
if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil {
p.logger.Error("error handling delete event", "error", err)
continue
+31 -2
handle_create.go
···
"github.com/haileyok/photocopy/models"
)
-
func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid string) error {
-
+
func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error {
iat, err := dateparse.ParseAny(indexedAt)
if err != nil {
return err
+
}
+
+
if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil {
+
p.logger.Error("error creating record", "error", err)
}
switch collection {
···
default:
return nil
}
+
}
+
+
func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error {
+
var cat time.Time
+
prkey, err := syntax.ParseTID(rkey)
+
if err == nil {
+
cat = prkey.Time()
+
} else {
+
cat = time.Now()
+
}
+
+
rec := models.Record{
+
Did: did,
+
Rkey: rkey,
+
Collection: collection,
+
Cid: cid,
+
Seq: seq,
+
Raw: raw,
+
CreatedAt: cat,
+
}
+
+
if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil {
+
return err
+
}
+
+
return nil
}
func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
+13
handle_delete.go
···
import (
"context"
+
"time"
+
+
"github.com/haileyok/photocopy/models"
)
func (p *Photocopy) handleDelete(ctx context.Context, did, collection, rkey string) error {
+
del := models.Delete{
+
Did: did,
+
Rkey: rkey,
+
CreatedAt: time.Now(),
+
}
+
+
if err := p.inserters.deletesInserter.Insert(ctx, del); err != nil {
+
return err
+
}
+
return nil
}
-1
models.go
···
-
package photocopy
+9
models/delete.go
···
+
package models
+
+
import "time"
+
+
type Delete struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
}
+13
models/record.go
···
+
package models
+
+
import "time"
+
+
type Record struct {
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
Collection string `ch:"collection"`
+
Cid string `ch:"cid"`
+
Seq string `ch:"seq"`
+
Raw []byte `ch:"raw"`
+
CreatedAt time.Time
+
}
+28
photocopy.go
···
interactionsInserter *clickhouse_inserter.Inserter
postsInserter *clickhouse_inserter.Inserter
plcInserter *clickhouse_inserter.Inserter
+
recordsInserter *clickhouse_inserter.Inserter
+
deletesInserter *clickhouse_inserter.Inserter
}
type Args struct {
···
return nil, err
}
+
ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_records",
+
Histogram: insertionsHist,
+
BatchSize: 1000,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO records (did, rkey, collection, cid, seq, raw, created_at)",
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_records",
+
Histogram: insertionsHist,
+
BatchSize: 1000,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO deletes (did, rkey, created_at)",
+
})
+
if err != nil {
+
return nil, err
+
}
+
is := &Inserters{
followsInserter: fi,
postsInserter: pi,
interactionsInserter: ii,
+
recordsInserter: ri,
+
deletesInserter: di,
}
p.inserters = is