···
···
"tangled.sh/tangled.sh/core/workflow"
"github.com/bluesky-social/indigo/atproto/syntax"
22
+
"github.com/go-git/go-git/v5/plumbing"
"github.com/posthog/posthog-go"
···
cfg := ec.ConsumerConfig{
42
-
ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
44
+
ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
return ec.NewConsumer(cfg), nil
56
-
func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
58
+
func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
case tangled.GitRefUpdateNSID:
···
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
86
+
err1 := populatePunchcard(d, record)
87
+
err2 := updateRepoLanguages(d, record)
91
+
err3 = pc.Enqueue(posthog.Capture{
92
+
DistinctId: record.CommitterDid,
93
+
Event: "git_ref_update",
97
+
return errors.Join(err1, err2, err3)
100
+
func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
for _, ke := range knownEmails {
···
111
-
if err := db.AddPunch(d, punch); err != nil {
129
+
return db.AddPunch(d, punch)
132
+
func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
133
+
if record.Meta == nil && record.Meta.LangBreakdown == nil {
134
+
return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
116
-
err = pc.Enqueue(posthog.Capture{
117
-
DistinctId: record.CommitterDid,
118
-
Event: "git_ref_update",
121
-
// non-fatal, TODO: log this
137
+
repos, err := db.GetRepos(
139
+
db.FilterEq("did", record.RepoDid),
140
+
db.FilterEq("name", record.RepoName),
143
+
return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
145
+
if len(repos) != 1 {
146
+
return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
150
+
ref := plumbing.ReferenceName(record.Ref)
151
+
if !ref.IsBranch() {
152
+
return fmt.Errorf("%s is not a valid reference name", ref)
155
+
var langs []db.RepoLanguage
156
+
for _, l := range record.Meta.LangBreakdown.Inputs {
161
+
langs = append(langs, db.RepoLanguage{
162
+
RepoAt: repo.RepoAt(),
169
+
return db.InsertRepoLanguages(d, langs)
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {