1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "slices"
8 "time"
9
10 "tangled.sh/tangled.sh/core/api/tangled"
11 "tangled.sh/tangled.sh/core/appview/cache"
12 "tangled.sh/tangled.sh/core/appview/config"
13 "tangled.sh/tangled.sh/core/appview/db"
14 ec "tangled.sh/tangled.sh/core/eventconsumer"
15 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
16 "tangled.sh/tangled.sh/core/log"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/workflow"
19
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 "github.com/posthog/posthog-go"
22)
23
24func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
25 knots, err := db.GetCompletedRegistrations(d)
26 if err != nil {
27 return nil, err
28 }
29
30 srcs := make(map[ec.Source]struct{})
31 for _, k := range knots {
32 s := ec.NewKnotSource(k)
33 srcs[s] = struct{}{}
34 }
35
36 logger := log.New("knotstream")
37 cache := cache.New(c.Redis.Addr)
38 cursorStore := cursor.NewRedisCursorStore(cache)
39
40 cfg := ec.ConsumerConfig{
41 Sources: srcs,
42 ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
43 RetryInterval: c.Knotstream.RetryInterval,
44 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
45 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
46 WorkerCount: c.Knotstream.WorkerCount,
47 QueueSize: c.Knotstream.QueueSize,
48 Logger: logger,
49 Dev: c.Core.Dev,
50 CursorStore: &cursorStore,
51 }
52
53 return ec.NewConsumer(cfg), nil
54}
55
56func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
57 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
58 switch msg.Nsid {
59 case tangled.GitRefUpdateNSID:
60 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
61 case tangled.PipelineNSID:
62 return ingestPipeline(d, source, msg)
63 }
64
65 return nil
66 }
67}
68
69func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
70 var record tangled.GitRefUpdate
71 err := json.Unmarshal(msg.EventJson, &record)
72 if err != nil {
73 return err
74 }
75
76 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
77 if err != nil {
78 return err
79 }
80 if !slices.Contains(knownKnots, source.Key()) {
81 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
82 }
83
84 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
85 if err != nil {
86 return err
87 }
88 count := 0
89 for _, ke := range knownEmails {
90 if record.Meta == nil {
91 continue
92 }
93 if record.Meta.CommitCount == nil {
94 continue
95 }
96 for _, ce := range record.Meta.CommitCount.ByEmail {
97 if ce == nil {
98 continue
99 }
100 if ce.Email == ke.Address {
101 count += int(ce.Count)
102 }
103 }
104 }
105
106 punch := db.Punch{
107 Did: record.CommitterDid,
108 Date: time.Now(),
109 Count: count,
110 }
111 if err := db.AddPunch(d, punch); err != nil {
112 return err
113 }
114
115 if !dev {
116 err = pc.Enqueue(posthog.Capture{
117 DistinctId: record.CommitterDid,
118 Event: "git_ref_update",
119 })
120 if err != nil {
121 // non-fatal, TODO: log this
122 }
123 }
124
125 return nil
126}
127
128func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
129 var record tangled.Pipeline
130 err := json.Unmarshal(msg.EventJson, &record)
131 if err != nil {
132 return err
133 }
134
135 if record.TriggerMetadata == nil {
136 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
137 }
138
139 if record.TriggerMetadata.Repo == nil {
140 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
141 }
142
143 // trigger info
144 var trigger db.Trigger
145 var sha string
146 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
147 switch trigger.Kind {
148 case workflow.TriggerKindPush:
149 trigger.PushRef = &record.TriggerMetadata.Push.Ref
150 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
151 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
152 sha = *trigger.PushNewSha
153 case workflow.TriggerKindPullRequest:
154 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
155 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
156 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
157 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
158 sha = *trigger.PRSourceSha
159 }
160
161 tx, err := d.Begin()
162 if err != nil {
163 return fmt.Errorf("failed to start txn: %w", err)
164 }
165
166 triggerId, err := db.AddTrigger(tx, trigger)
167 if err != nil {
168 return fmt.Errorf("failed to add trigger entry: %w", err)
169 }
170
171 pipeline := db.Pipeline{
172 Rkey: msg.Rkey,
173 Knot: source.Key(),
174 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
175 RepoName: record.TriggerMetadata.Repo.Repo,
176 TriggerId: int(triggerId),
177 Sha: sha,
178 }
179
180 err = db.AddPipeline(tx, pipeline)
181 if err != nil {
182 return fmt.Errorf("failed to add pipeline: %w", err)
183 }
184
185 err = tx.Commit()
186 if err != nil {
187 return fmt.Errorf("failed to commit txn: %w", err)
188 }
189
190 return nil
191}