package state import ( "context" "encoding/json" "fmt" "slices" "time" "tangled.sh/tangled.sh/core/api/tangled" "tangled.sh/tangled.sh/core/appview/cache" "tangled.sh/tangled.sh/core/appview/config" "tangled.sh/tangled.sh/core/appview/db" ec "tangled.sh/tangled.sh/core/eventconsumer" "tangled.sh/tangled.sh/core/eventconsumer/cursor" "tangled.sh/tangled.sh/core/log" "tangled.sh/tangled.sh/core/rbac" "tangled.sh/tangled.sh/core/workflow" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/posthog/posthog-go" ) func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { knots, err := db.GetCompletedRegistrations(d) if err != nil { return nil, err } srcs := make(map[ec.Source]struct{}) for _, k := range knots { s := ec.NewKnotSource(k) srcs[s] = struct{}{} } logger := log.New("knotstream") cache := cache.New(c.Redis.Addr) cursorStore := cursor.NewRedisCursorStore(cache) cfg := ec.ConsumerConfig{ Sources: srcs, ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev), RetryInterval: c.Knotstream.RetryInterval, MaxRetryInterval: c.Knotstream.MaxRetryInterval, ConnectionTimeout: c.Knotstream.ConnectionTimeout, WorkerCount: c.Knotstream.WorkerCount, QueueSize: c.Knotstream.QueueSize, Logger: logger, Dev: c.Core.Dev, CursorStore: &cursorStore, } return ec.NewConsumer(cfg), nil } func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { return func(ctx context.Context, source ec.Source, msg ec.Message) error { switch msg.Nsid { case tangled.GitRefUpdateNSID: return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) case tangled.PipelineNSID: return ingestPipeline(d, source, msg) } return nil } } func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { var record tangled.GitRefUpdate err := json.Unmarshal(msg.EventJson, &record) if err != nil { return err } knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) if err != nil { return err } if !slices.Contains(knownKnots, source.Key()) { return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) } knownEmails, err := db.GetAllEmails(d, record.CommitterDid) if err != nil { return err } count := 0 for _, ke := range knownEmails { if record.Meta == nil { continue } if record.Meta.CommitCount == nil { continue } for _, ce := range record.Meta.CommitCount.ByEmail { if ce == nil { continue } if ce.Email == ke.Address { count += int(ce.Count) } } } punch := db.Punch{ Did: record.CommitterDid, Date: time.Now(), Count: count, } if err := db.AddPunch(d, punch); err != nil { return err } if !dev { err = pc.Enqueue(posthog.Capture{ DistinctId: record.CommitterDid, Event: "git_ref_update", }) if err != nil { // non-fatal, TODO: log this } } return nil } func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { var record tangled.Pipeline err := json.Unmarshal(msg.EventJson, &record) if err != nil { return err } if record.TriggerMetadata == nil { return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) } if record.TriggerMetadata.Repo == nil { return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) } // trigger info var trigger db.Trigger var sha string trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) switch trigger.Kind { case workflow.TriggerKindPush: trigger.PushRef = &record.TriggerMetadata.Push.Ref trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha sha = *trigger.PushNewSha case workflow.TriggerKindPullRequest: trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha trigger.PRAction = &record.TriggerMetadata.PullRequest.Action sha = *trigger.PRSourceSha } tx, err := d.Begin() if err != nil { return fmt.Errorf("failed to start txn: %w", err) } triggerId, err := db.AddTrigger(tx, trigger) if err != nil { return fmt.Errorf("failed to add trigger entry: %w", err) } pipeline := db.Pipeline{ Rkey: msg.Rkey, Knot: source.Key(), RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), RepoName: record.TriggerMetadata.Repo.Repo, TriggerId: int(triggerId), Sha: sha, } err = db.AddPipeline(tx, pipeline) if err != nil { return fmt.Errorf("failed to add pipeline: %w", err) } err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit txn: %w", err) } return nil }