forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/cache"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/log"
19 "tangled.org/core/rbac"
20 "tangled.org/core/workflow"
21
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/posthog/posthog-go"
25)
26
27func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
28 logger := log.FromContext(ctx)
29 logger = log.SubLogger(logger, "knotstream")
30
31 knots, err := db.GetRegistrations(
32 d,
33 db.FilterIsNot("registered", "null"),
34 )
35 if err != nil {
36 return nil, err
37 }
38
39 srcs := make(map[ec.Source]struct{})
40 for _, k := range knots {
41 s := ec.NewKnotSource(k.Domain)
42 srcs[s] = struct{}{}
43 }
44
45 cache := cache.New(c.Redis.Addr)
46 cursorStore := cursor.NewRedisCursorStore(cache)
47
48 cfg := ec.ConsumerConfig{
49 Sources: srcs,
50 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
51 RetryInterval: c.Knotstream.RetryInterval,
52 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
53 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
54 WorkerCount: c.Knotstream.WorkerCount,
55 QueueSize: c.Knotstream.QueueSize,
56 Logger: logger,
57 Dev: c.Core.Dev,
58 CursorStore: &cursorStore,
59 }
60
61 return ec.NewConsumer(cfg), nil
62}
63
64func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
65 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
66 switch msg.Nsid {
67 case tangled.GitRefUpdateNSID:
68 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
69 case tangled.PipelineNSID:
70 return ingestPipeline(d, source, msg)
71 }
72
73 return nil
74 }
75}
76
77func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
78 var record tangled.GitRefUpdate
79 err := json.Unmarshal(msg.EventJson, &record)
80 if err != nil {
81 return err
82 }
83
84 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
85 if err != nil {
86 return err
87 }
88 if !slices.Contains(knownKnots, source.Key()) {
89 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
90 }
91
92 err1 := populatePunchcard(d, record)
93 err2 := updateRepoLanguages(d, record)
94
95 var err3 error
96 if !dev {
97 err3 = pc.Enqueue(posthog.Capture{
98 DistinctId: record.CommitterDid,
99 Event: "git_ref_update",
100 })
101 }
102
103 return errors.Join(err1, err2, err3)
104}
105
106func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
107 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
108 if err != nil {
109 return err
110 }
111
112 count := 0
113 for _, ke := range knownEmails {
114 if record.Meta == nil {
115 continue
116 }
117 if record.Meta.CommitCount == nil {
118 continue
119 }
120 for _, ce := range record.Meta.CommitCount.ByEmail {
121 if ce == nil {
122 continue
123 }
124 if ce.Email == ke.Address {
125 count += int(ce.Count)
126 }
127 }
128 }
129
130 punch := models.Punch{
131 Did: record.CommitterDid,
132 Date: time.Now(),
133 Count: count,
134 }
135 return db.AddPunch(d, punch)
136}
137
138func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
139 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
140 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
141 }
142
143 repos, err := db.GetRepos(
144 d,
145 0,
146 db.FilterEq("did", record.RepoDid),
147 db.FilterEq("name", record.RepoName),
148 )
149 if err != nil {
150 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
151 }
152 if len(repos) != 1 {
153 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
154 }
155 repo := repos[0]
156
157 ref := plumbing.ReferenceName(record.Ref)
158 if !ref.IsBranch() {
159 return fmt.Errorf("%s is not a valid reference name", ref)
160 }
161
162 var langs []models.RepoLanguage
163 for _, l := range record.Meta.LangBreakdown.Inputs {
164 if l == nil {
165 continue
166 }
167
168 langs = append(langs, models.RepoLanguage{
169 RepoAt: repo.RepoAt(),
170 Ref: ref.Short(),
171 IsDefaultRef: record.Meta.IsDefaultRef,
172 Language: l.Lang,
173 Bytes: l.Size,
174 })
175 }
176
177 tx, err := d.Begin()
178 if err != nil {
179 return err
180 }
181 defer tx.Rollback()
182
183 // update appview's cache
184 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
185 if err != nil {
186 fmt.Printf("failed; %s\n", err)
187 // non-fatal
188 }
189
190 return tx.Commit()
191}
192
193func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
194 var record tangled.Pipeline
195 err := json.Unmarshal(msg.EventJson, &record)
196 if err != nil {
197 return err
198 }
199
200 if record.TriggerMetadata == nil {
201 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
202 }
203
204 if record.TriggerMetadata.Repo == nil {
205 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
206 }
207
208 // does this repo have a spindle configured?
209 repos, err := db.GetRepos(
210 d,
211 0,
212 db.FilterEq("did", record.TriggerMetadata.Repo.Did),
213 db.FilterEq("name", record.TriggerMetadata.Repo.Repo),
214 )
215 if err != nil {
216 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
217 }
218 if len(repos) != 1 {
219 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
220 }
221 if repos[0].Spindle == "" {
222 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
223 }
224
225 // trigger info
226 var trigger models.Trigger
227 var sha string
228 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
229 switch trigger.Kind {
230 case workflow.TriggerKindPush:
231 trigger.PushRef = &record.TriggerMetadata.Push.Ref
232 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
233 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
234 sha = *trigger.PushNewSha
235 case workflow.TriggerKindPullRequest:
236 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
237 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
238 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
239 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
240 sha = *trigger.PRSourceSha
241 }
242
243 tx, err := d.Begin()
244 if err != nil {
245 return fmt.Errorf("failed to start txn: %w", err)
246 }
247
248 triggerId, err := db.AddTrigger(tx, trigger)
249 if err != nil {
250 return fmt.Errorf("failed to add trigger entry: %w", err)
251 }
252
253 pipeline := models.Pipeline{
254 Rkey: msg.Rkey,
255 Knot: source.Key(),
256 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
257 RepoName: record.TriggerMetadata.Repo.Repo,
258 TriggerId: int(triggerId),
259 Sha: sha,
260 }
261
262 err = db.AddPipeline(tx, pipeline)
263 if err != nil {
264 return fmt.Errorf("failed to add pipeline: %w", err)
265 }
266
267 err = tx.Commit()
268 if err != nil {
269 return fmt.Errorf("failed to commit txn: %w", err)
270 }
271
272 return nil
273}