1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/appview/cache"
13 "tangled.sh/tangled.sh/core/appview/config"
14 "tangled.sh/tangled.sh/core/appview/db"
15 ec "tangled.sh/tangled.sh/core/eventconsumer"
16 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/rbac"
19 "tangled.sh/tangled.sh/core/workflow"
20
21 "github.com/bluesky-social/indigo/atproto/syntax"
22 "github.com/go-git/go-git/v5/plumbing"
23 "github.com/posthog/posthog-go"
24)
25
26func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
27 knots, err := db.GetCompletedRegistrations(d)
28 if err != nil {
29 return nil, err
30 }
31
32 srcs := make(map[ec.Source]struct{})
33 for _, k := range knots {
34 s := ec.NewKnotSource(k)
35 srcs[s] = struct{}{}
36 }
37
38 logger := log.New("knotstream")
39 cache := cache.New(c.Redis.Addr)
40 cursorStore := cursor.NewRedisCursorStore(cache)
41
42 cfg := ec.ConsumerConfig{
43 Sources: srcs,
44 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
45 RetryInterval: c.Knotstream.RetryInterval,
46 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
47 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
48 WorkerCount: c.Knotstream.WorkerCount,
49 QueueSize: c.Knotstream.QueueSize,
50 Logger: logger,
51 Dev: c.Core.Dev,
52 CursorStore: &cursorStore,
53 }
54
55 return ec.NewConsumer(cfg), nil
56}
57
58func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
59 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
60 switch msg.Nsid {
61 case tangled.GitRefUpdateNSID:
62 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
63 case tangled.PipelineNSID:
64 return ingestPipeline(d, source, msg)
65 }
66
67 return nil
68 }
69}
70
71func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
72 var record tangled.GitRefUpdate
73 err := json.Unmarshal(msg.EventJson, &record)
74 if err != nil {
75 return err
76 }
77
78 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
79 if err != nil {
80 return err
81 }
82 if !slices.Contains(knownKnots, source.Key()) {
83 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
84 }
85
86 err1 := populatePunchcard(d, record)
87 err2 := updateRepoLanguages(d, record)
88
89 var err3 error
90 if !dev {
91 err3 = pc.Enqueue(posthog.Capture{
92 DistinctId: record.CommitterDid,
93 Event: "git_ref_update",
94 })
95 }
96
97 return errors.Join(err1, err2, err3)
98}
99
100func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
101 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
102 if err != nil {
103 return err
104 }
105
106 count := 0
107 for _, ke := range knownEmails {
108 if record.Meta == nil {
109 continue
110 }
111 if record.Meta.CommitCount == nil {
112 continue
113 }
114 for _, ce := range record.Meta.CommitCount.ByEmail {
115 if ce == nil {
116 continue
117 }
118 if ce.Email == ke.Address {
119 count += int(ce.Count)
120 }
121 }
122 }
123
124 punch := db.Punch{
125 Did: record.CommitterDid,
126 Date: time.Now(),
127 Count: count,
128 }
129 return db.AddPunch(d, punch)
130}
131
132func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
133 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
134 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
135 }
136
137 repos, err := db.GetRepos(
138 d,
139 0,
140 db.FilterEq("did", record.RepoDid),
141 db.FilterEq("name", record.RepoName),
142 )
143 if err != nil {
144 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
145 }
146 if len(repos) != 1 {
147 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
148 }
149 repo := repos[0]
150
151 ref := plumbing.ReferenceName(record.Ref)
152 if !ref.IsBranch() {
153 return fmt.Errorf("%s is not a valid reference name", ref)
154 }
155
156 var langs []db.RepoLanguage
157 for _, l := range record.Meta.LangBreakdown.Inputs {
158 if l == nil {
159 continue
160 }
161
162 langs = append(langs, db.RepoLanguage{
163 RepoAt: repo.RepoAt(),
164 Ref: ref.Short(),
165 IsDefaultRef: record.Meta.IsDefaultRef,
166 Language: l.Lang,
167 Bytes: l.Size,
168 })
169 }
170
171 return db.InsertRepoLanguages(d, langs)
172}
173
174func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
175 var record tangled.Pipeline
176 err := json.Unmarshal(msg.EventJson, &record)
177 if err != nil {
178 return err
179 }
180
181 if record.TriggerMetadata == nil {
182 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
183 }
184
185 if record.TriggerMetadata.Repo == nil {
186 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
187 }
188
189 // does this repo have a spindle configured?
190 repos, err := db.GetRepos(
191 d,
192 0,
193 db.FilterEq("did", record.TriggerMetadata.Repo.Did),
194 db.FilterEq("name", record.TriggerMetadata.Repo.Repo),
195 )
196 if err != nil {
197 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
198 }
199 if len(repos) != 1 {
200 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
201 }
202 if repos[0].Spindle == "" {
203 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
204 }
205
206 // trigger info
207 var trigger db.Trigger
208 var sha string
209 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
210 switch trigger.Kind {
211 case workflow.TriggerKindPush:
212 trigger.PushRef = &record.TriggerMetadata.Push.Ref
213 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
214 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
215 sha = *trigger.PushNewSha
216 case workflow.TriggerKindPullRequest:
217 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
218 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
219 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
220 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
221 sha = *trigger.PRSourceSha
222 }
223
224 tx, err := d.Begin()
225 if err != nil {
226 return fmt.Errorf("failed to start txn: %w", err)
227 }
228
229 triggerId, err := db.AddTrigger(tx, trigger)
230 if err != nil {
231 return fmt.Errorf("failed to add trigger entry: %w", err)
232 }
233
234 pipeline := db.Pipeline{
235 Rkey: msg.Rkey,
236 Knot: source.Key(),
237 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
238 RepoName: record.TriggerMetadata.Repo.Repo,
239 TriggerId: int(triggerId),
240 Sha: sha,
241 }
242
243 err = db.AddPipeline(tx, pipeline)
244 if err != nil {
245 return fmt.Errorf("failed to add pipeline: %w", err)
246 }
247
248 err = tx.Commit()
249 if err != nil {
250 return fmt.Errorf("failed to commit txn: %w", err)
251 }
252
253 return nil
254}