1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/appview/cache"
13 "tangled.sh/tangled.sh/core/appview/config"
14 "tangled.sh/tangled.sh/core/appview/db"
15 ec "tangled.sh/tangled.sh/core/eventconsumer"
16 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/rbac"
19 "tangled.sh/tangled.sh/core/workflow"
20
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/posthog/posthog-go"
24)
25
26func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
27 knots, err := db.GetRegistrations(
28 d,
29 db.FilterIsNot("registered", "null"),
30 )
31 if err != nil {
32 return nil, err
33 }
34
35 srcs := make(map[ec.Source]struct{})
36 for _, k := range knots {
37 s := ec.NewKnotSource(k.Domain)
38 srcs[s] = struct{}{}
39 }
40
41 logger := log.New("knotstream")
42 cache := cache.New(c.Redis.Addr)
43 cursorStore := cursor.NewRedisCursorStore(cache)
44
45 cfg := ec.ConsumerConfig{
46 Sources: srcs,
47 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
48 RetryInterval: c.Knotstream.RetryInterval,
49 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
50 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
51 WorkerCount: c.Knotstream.WorkerCount,
52 QueueSize: c.Knotstream.QueueSize,
53 Logger: logger,
54 Dev: c.Core.Dev,
55 CursorStore: &cursorStore,
56 }
57
58 return ec.NewConsumer(cfg), nil
59}
60
61func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
62 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
63 switch msg.Nsid {
64 case tangled.GitRefUpdateNSID:
65 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
66 case tangled.PipelineNSID:
67 return ingestPipeline(d, source, msg)
68 }
69
70 return nil
71 }
72}
73
74func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
75 var record tangled.GitRefUpdate
76 err := json.Unmarshal(msg.EventJson, &record)
77 if err != nil {
78 return err
79 }
80
81 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
82 if err != nil {
83 return err
84 }
85 if !slices.Contains(knownKnots, source.Key()) {
86 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
87 }
88
89 err1 := populatePunchcard(d, record)
90 err2 := updateRepoLanguages(d, record)
91
92 var err3 error
93 if !dev {
94 err3 = pc.Enqueue(posthog.Capture{
95 DistinctId: record.CommitterDid,
96 Event: "git_ref_update",
97 })
98 }
99
100 return errors.Join(err1, err2, err3)
101}
102
103func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
104 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
105 if err != nil {
106 return err
107 }
108
109 count := 0
110 for _, ke := range knownEmails {
111 if record.Meta == nil {
112 continue
113 }
114 if record.Meta.CommitCount == nil {
115 continue
116 }
117 for _, ce := range record.Meta.CommitCount.ByEmail {
118 if ce == nil {
119 continue
120 }
121 if ce.Email == ke.Address {
122 count += int(ce.Count)
123 }
124 }
125 }
126
127 punch := db.Punch{
128 Did: record.CommitterDid,
129 Date: time.Now(),
130 Count: count,
131 }
132 return db.AddPunch(d, punch)
133}
134
135func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
136 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
137 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
138 }
139
140 repos, err := db.GetRepos(
141 d,
142 0,
143 db.FilterEq("did", record.RepoDid),
144 db.FilterEq("name", record.RepoName),
145 )
146 if err != nil {
147 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
148 }
149 if len(repos) != 1 {
150 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
151 }
152 repo := repos[0]
153
154 ref := plumbing.ReferenceName(record.Ref)
155 if !ref.IsBranch() {
156 return fmt.Errorf("%s is not a valid reference name", ref)
157 }
158
159 var langs []db.RepoLanguage
160 for _, l := range record.Meta.LangBreakdown.Inputs {
161 if l == nil {
162 continue
163 }
164
165 langs = append(langs, db.RepoLanguage{
166 RepoAt: repo.RepoAt(),
167 Ref: ref.Short(),
168 IsDefaultRef: record.Meta.IsDefaultRef,
169 Language: l.Lang,
170 Bytes: l.Size,
171 })
172 }
173
174 return db.InsertRepoLanguages(d, langs)
175}
176
177func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
178 var record tangled.Pipeline
179 err := json.Unmarshal(msg.EventJson, &record)
180 if err != nil {
181 return err
182 }
183
184 if record.TriggerMetadata == nil {
185 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
186 }
187
188 if record.TriggerMetadata.Repo == nil {
189 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
190 }
191
192 // does this repo have a spindle configured?
193 repos, err := db.GetRepos(
194 d,
195 0,
196 db.FilterEq("did", record.TriggerMetadata.Repo.Did),
197 db.FilterEq("name", record.TriggerMetadata.Repo.Repo),
198 )
199 if err != nil {
200 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
201 }
202 if len(repos) != 1 {
203 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
204 }
205 if repos[0].Spindle == "" {
206 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
207 }
208
209 // trigger info
210 var trigger db.Trigger
211 var sha string
212 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
213 switch trigger.Kind {
214 case workflow.TriggerKindPush:
215 trigger.PushRef = &record.TriggerMetadata.Push.Ref
216 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
217 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
218 sha = *trigger.PushNewSha
219 case workflow.TriggerKindPullRequest:
220 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
221 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
222 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
223 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
224 sha = *trigger.PRSourceSha
225 }
226
227 tx, err := d.Begin()
228 if err != nil {
229 return fmt.Errorf("failed to start txn: %w", err)
230 }
231
232 triggerId, err := db.AddTrigger(tx, trigger)
233 if err != nil {
234 return fmt.Errorf("failed to add trigger entry: %w", err)
235 }
236
237 pipeline := db.Pipeline{
238 Rkey: msg.Rkey,
239 Knot: source.Key(),
240 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
241 RepoName: record.TriggerMetadata.Repo.Repo,
242 TriggerId: int(triggerId),
243 Sha: sha,
244 }
245
246 err = db.AddPipeline(tx, pipeline)
247 if err != nil {
248 return fmt.Errorf("failed to add pipeline: %w", err)
249 }
250
251 err = tx.Commit()
252 if err != nil {
253 return fmt.Errorf("failed to commit txn: %w", err)
254 }
255
256 return nil
257}