this repo has no description

feat: consume interactions

+175 -129
handle_create.go
···
package photocopy
import (
+
"bytes"
"context"
"fmt"
+
"strings"
"time"
"github.com/araddon/dateparse"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/haileyok/photocopy/models"
)
func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid string) error {
-
_, err := dateparse.ParseAny(indexedAt)
+
+
iat, err := dateparse.ParseAny(indexedAt)
if err != nil {
return err
}
switch collection {
+
case "app.bsky.feed.post":
+
return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat)
+
case "app.bsky.graph.follow":
+
return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat)
+
case "app.bsky.feed.like", "app.bsky.feed.repost":
+
return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat)
+
default:
+
return nil
+
}
+
}
+
+
func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
+
var rec bsky.FeedPost
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
post := &models.Post{
+
Uri: uri,
+
Rkey: rkey,
+
CreatedAt: *cat,
+
IndexedAt: indexedAt,
+
Did: did,
+
}
+
+
if rec.Reply != nil {
+
if rec.Reply.Parent != nil {
+
aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.ParentDid = aturi.Authority().String()
+
post.ParentUri = rec.Reply.Parent.Uri
+
}
+
if rec.Reply.Root != nil {
+
aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.RootDid = aturi.Authority().String()
+
post.RootUri = rec.Reply.Root.Uri
+
}
+
}
+
+
if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
+
aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.QuoteDid = aturi.Authority().String()
+
post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri
+
} else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
+
aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
post.QuoteDid = aturi.Authority().String()
+
post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
+
}
+
+
if err := p.inserters.postsInserter.Insert(ctx, post); err != nil {
+
return err
}
return nil
}
-
// func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error {
-
// var rec bsky.FeedPost
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// jb, err := json.Marshal(rec)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// bqrec := &BigQueryRecord{
-
// Uri: uri,
-
// AuthorDid: did,
-
// Collection: collection,
-
// Rkey: rkey,
-
// CreatedAt: *cat,
-
// IndexedAt: indexedAt,
-
// Raw: recb,
-
// Json: string(jb),
-
// Cid: cid,
-
// Rev: rev,
-
// }
-
//
-
// if rec.Reply != nil {
-
// if rec.Reply.Parent != nil {
-
// bqrec.ReplyToUri = rec.Reply.Parent.Uri
-
// }
-
// if rec.Reply.Root != nil {
-
// bqrec.InThreadUri = rec.Reply.Root.Uri
-
// }
-
// }
-
//
-
// if err := p.inserters.genericInserter.Insert(ctx, bqrec); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
+
func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
+
var rec bsky.GraphFollow
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
-
// func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error {
-
// var rec bsky.GraphFollow
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// bqrec := &BigQueryFollow{
-
// Uri: uri,
-
// AuthorDid: did,
-
// Rkey: rkey,
-
// CreatedAt: *cat,
-
// IndexedAt: indexedAt,
-
// SubjectDid: rec.Subject,
-
// }
-
//
-
// if err := p.inserters.followsInserter.Insert(ctx, bqrec); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
-
//
-
// func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
-
// colPts := strings.Split(collection, ".")
-
// if len(colPts) < 4 {
-
// return fmt.Errorf("invalid collection type %s", collection)
-
// }
-
//
-
// bqi := &BigQueryInteraction{
-
// Uri: uri,
-
// Kind: colPts[3],
-
// AuthorDid: did,
-
// Rkey: rkey,
-
// IndexedAt: indexedAt,
-
// }
-
//
-
// switch collection {
-
// case "app.bsky.feed.like":
-
// var rec bsky.FeedLike
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// if rec.Subject == nil {
-
// return fmt.Errorf("invalid subject in like")
-
// }
-
//
-
// bqi.SubjectUri = rec.Subject.Uri
-
// bqi.CreatedAt = *cat
-
// case "app.bsky.feed.repost":
-
// var rec bsky.FeedRepost
-
// if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
-
// return err
-
// }
-
//
-
// cat, err := parseTimeFromRecord(rec, rkey)
-
// if err != nil {
-
// return err
-
// }
-
//
-
// if rec.Subject == nil {
-
// return fmt.Errorf("invalid subject in repost")
-
// }
-
//
-
// bqi.SubjectUri = rec.Subject.Uri
-
// bqi.CreatedAt = *cat
-
// }
-
//
-
// if err := p.inserters.interactionsInserter.Insert(ctx, bqi); err != nil {
-
// return err
-
// }
-
//
-
// return nil
-
// }
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
follow := &models.Follow{
+
Uri: uri,
+
Did: did,
+
Rkey: rkey,
+
CreatedAt: *cat,
+
IndexedAt: indexedAt,
+
Subject: rec.Subject,
+
}
+
+
if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil {
+
return err
+
}
+
+
return nil
+
}
+
+
func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error {
+
colPts := strings.Split(collection, ".")
+
if len(colPts) < 4 {
+
return fmt.Errorf("invalid collection type %s", collection)
+
}
+
+
interaction := &models.Interaction{
+
Uri: uri,
+
Kind: colPts[3],
+
Rkey: rkey,
+
IndexedAt: indexedAt,
+
Did: did,
+
SubjectUri: uri,
+
SubjectDid: did,
+
}
+
+
switch collection {
+
case "app.bsky.feed.like":
+
var rec bsky.FeedLike
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
if rec.Subject == nil {
+
return fmt.Errorf("invalid subject in like")
+
}
+
+
aturi, err := syntax.ParseATURI(rec.Subject.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
+
interaction.SubjectDid = aturi.Authority().String()
+
interaction.SubjectUri = rec.Subject.Uri
+
interaction.CreatedAt = *cat
+
case "app.bsky.feed.repost":
+
var rec bsky.FeedRepost
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
+
return err
+
}
+
+
cat, err := parseTimeFromRecord(rec, rkey)
+
if err != nil {
+
return err
+
}
+
+
if rec.Subject == nil {
+
return fmt.Errorf("invalid subject in repost")
+
}
+
+
aturi, err := syntax.ParseATURI(rec.Subject.Uri)
+
if err != nil {
+
return fmt.Errorf("error parsing at-uri: %w", err)
+
+
}
+
+
interaction.SubjectDid = aturi.Authority().String()
+
interaction.SubjectUri = rec.Subject.Uri
+
interaction.CreatedAt = *cat
+
}
+
+
if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil {
+
return err
+
}
+
+
return nil
+
}
func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) {
var rkeyTime time.Time
+12
models/follow.go
···
+
package models
+
+
import "time"
+
+
type Follow struct {
+
Uri string `ch:"uri"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
Subject string `ch:"subject"`
+
}
+14
models/interaction.go
···
+
package models
+
+
import "time"
+
+
type Interaction struct {
+
Uri string `ch:"like"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
Kind string `ch:"kind"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
SubjectUri string `ch:"subject_uri"`
+
SubjectDid string `ch:"subject_did"`
+
}
+17
models/post.go
···
+
package models
+
+
import "time"
+
+
type Post struct {
+
Uri string `ch:"uri"`
+
Did string `ch:"did"`
+
Rkey string `ch:"rkey"`
+
CreatedAt time.Time `ch:"created_at"`
+
IndexedAt time.Time `ch:"indexed_at"`
+
RootUri string `ch:"root"`
+
RootDid string `ch:"root_did"`
+
ParentUri string `ch:"reply"`
+
ParentDid string `ch:"reply_did"`
+
QuoteUri string `ch:"quote_uri"`
+
QuoteDid string `ch:"quote_did"`
+
}
+32 -4
photocopy.go
···
}
type Inserters struct {
-
followsInserter *clickhouse_inserter.Inserter
-
plcInserter *clickhouse_inserter.Inserter
+
followsInserter *clickhouse_inserter.Inserter
+
interactionsInserter *clickhouse_inserter.Inserter
+
postsInserter *clickhouse_inserter.Inserter
+
plcInserter *clickhouse_inserter.Inserter
}
type Args struct {
···
fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
PrometheusCounterPrefix: "photocopy_follows",
Histogram: insertionsHist,
+
BatchSize: 1000,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)",
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_posts",
+
Histogram: insertionsHist,
BatchSize: 100,
Logger: p.logger,
Conn: conn,
-
Query: "",
+
Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)",
+
})
+
if err != nil {
+
return nil, err
+
}
+
+
ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{
+
PrometheusCounterPrefix: "photocopy_interactions",
+
Histogram: insertionsHist,
+
BatchSize: 1000,
+
Logger: p.logger,
+
Conn: conn,
+
Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)",
})
if err != nil {
return nil, err
}
is := &Inserters{
-
followsInserter: fi,
+
followsInserter: fi,
+
postsInserter: pi,
+
interactionsInserter: ii,
}
p.inserters = is