···
···
"tangled.sh/tangled.sh/core/workflow"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/posthog/posthog-go"
···
cfg := ec.ConsumerConfig{
-
ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
return ec.NewConsumer(cfg), nil
-
func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
case tangled.GitRefUpdateNSID:
···
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
for _, ke := range knownEmails {
···
-
if err := db.AddPunch(d, punch); err != nil {
-
err = pc.Enqueue(posthog.Capture{
-
DistinctId: record.CommitterDid,
-
Event: "git_ref_update",
-
// non-fatal, TODO: log this
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
···
···
"tangled.sh/tangled.sh/core/workflow"
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/go-git/go-git/v5/plumbing"
"github.com/posthog/posthog-go"
···
cfg := ec.ConsumerConfig{
+
ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
return ec.NewConsumer(cfg), nil
+
func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
case tangled.GitRefUpdateNSID:
···
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
+
err1 := populatePunchcard(d, record)
+
err2 := updateRepoLanguages(d, record)
+
err3 = pc.Enqueue(posthog.Capture{
+
DistinctId: record.CommitterDid,
+
Event: "git_ref_update",
+
return errors.Join(err1, err2, err3)
+
func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
for _, ke := range knownEmails {
···
+
return db.AddPunch(d, punch)
+
func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
+
if record.Meta == nil && record.Meta.LangBreakdown == nil {
+
return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
+
repos, err := db.GetRepos(
+
db.FilterEq("did", record.RepoDid),
+
db.FilterEq("name", record.RepoName),
+
return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
+
ref := plumbing.ReferenceName(record.Ref)
+
return fmt.Errorf("%s is not a valid reference name", ref)
+
var langs []db.RepoLanguage
+
for _, l := range record.Meta.LangBreakdown.Inputs {
+
langs = append(langs, db.RepoLanguage{
+
return db.InsertRepoLanguages(d, langs)
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {