forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/cache"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/log"
19 "tangled.org/core/orm"
20 "tangled.org/core/rbac"
21 "tangled.org/core/workflow"
22
23 "github.com/bluesky-social/indigo/atproto/syntax"
24 "github.com/go-git/go-git/v5/plumbing"
25 "github.com/posthog/posthog-go"
26)
27
28func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
29 logger := log.FromContext(ctx)
30 logger = log.SubLogger(logger, "knotstream")
31
32 knots, err := db.GetRegistrations(
33 d,
34 orm.FilterIsNot("registered", "null"),
35 )
36 if err != nil {
37 return nil, err
38 }
39
40 srcs := make(map[ec.Source]struct{})
41 for _, k := range knots {
42 s := ec.NewKnotSource(k.Domain)
43 srcs[s] = struct{}{}
44 }
45
46 cache := cache.New(c.Redis.Addr)
47 cursorStore := cursor.NewRedisCursorStore(cache)
48
49 cfg := ec.ConsumerConfig{
50 Sources: srcs,
51 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
52 RetryInterval: c.Knotstream.RetryInterval,
53 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
54 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
55 WorkerCount: c.Knotstream.WorkerCount,
56 QueueSize: c.Knotstream.QueueSize,
57 Logger: logger,
58 Dev: c.Core.Dev,
59 CursorStore: &cursorStore,
60 }
61
62 return ec.NewConsumer(cfg), nil
63}
64
65func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
66 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
67 switch msg.Nsid {
68 case tangled.GitRefUpdateNSID:
69 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
70 case tangled.PipelineNSID:
71 return ingestPipeline(d, source, msg)
72 }
73
74 return nil
75 }
76}
77
78func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
79 var record tangled.GitRefUpdate
80 err := json.Unmarshal(msg.EventJson, &record)
81 if err != nil {
82 return err
83 }
84
85 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
86 if err != nil {
87 return err
88 }
89 if !slices.Contains(knownKnots, source.Key()) {
90 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
91 }
92
93 err1 := populatePunchcard(d, record)
94 err2 := updateRepoLanguages(d, record)
95
96 var err3 error
97 if !dev {
98 err3 = pc.Enqueue(posthog.Capture{
99 DistinctId: record.CommitterDid,
100 Event: "git_ref_update",
101 })
102 }
103
104 return errors.Join(err1, err2, err3)
105}
106
107func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
108 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
109 if err != nil {
110 return err
111 }
112
113 count := 0
114 for _, ke := range knownEmails {
115 if record.Meta == nil {
116 continue
117 }
118 if record.Meta.CommitCount == nil {
119 continue
120 }
121 for _, ce := range record.Meta.CommitCount.ByEmail {
122 if ce == nil {
123 continue
124 }
125 if ce.Email == ke.Address {
126 count += int(ce.Count)
127 }
128 }
129 }
130
131 punch := models.Punch{
132 Did: record.CommitterDid,
133 Date: time.Now(),
134 Count: count,
135 }
136 return db.AddPunch(d, punch)
137}
138
139func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
140 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
141 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
142 }
143
144 repos, err := db.GetRepos(
145 d,
146 0,
147 orm.FilterEq("did", record.RepoDid),
148 orm.FilterEq("name", record.RepoName),
149 )
150 if err != nil {
151 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
152 }
153 if len(repos) != 1 {
154 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
155 }
156 repo := repos[0]
157
158 ref := plumbing.ReferenceName(record.Ref)
159 if !ref.IsBranch() {
160 return fmt.Errorf("%s is not a valid reference name", ref)
161 }
162
163 var langs []models.RepoLanguage
164 for _, l := range record.Meta.LangBreakdown.Inputs {
165 if l == nil {
166 continue
167 }
168
169 langs = append(langs, models.RepoLanguage{
170 RepoAt: repo.RepoAt(),
171 Ref: ref.Short(),
172 IsDefaultRef: record.Meta.IsDefaultRef,
173 Language: l.Lang,
174 Bytes: l.Size,
175 })
176 }
177
178 tx, err := d.Begin()
179 if err != nil {
180 return err
181 }
182 defer tx.Rollback()
183
184 // update appview's cache
185 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
186 if err != nil {
187 fmt.Printf("failed; %s\n", err)
188 // non-fatal
189 }
190
191 return tx.Commit()
192}
193
194func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
195 var record tangled.Pipeline
196 err := json.Unmarshal(msg.EventJson, &record)
197 if err != nil {
198 return err
199 }
200
201 if record.TriggerMetadata == nil {
202 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
203 }
204
205 if record.TriggerMetadata.Repo == nil {
206 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
207 }
208
209 // does this repo have a spindle configured?
210 repos, err := db.GetRepos(
211 d,
212 0,
213 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
214 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
215 )
216 if err != nil {
217 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
218 }
219 if len(repos) != 1 {
220 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
221 }
222 if repos[0].Spindle == "" {
223 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
224 }
225
226 // trigger info
227 var trigger models.Trigger
228 var sha string
229 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
230 switch trigger.Kind {
231 case workflow.TriggerKindPush:
232 trigger.PushRef = &record.TriggerMetadata.Push.Ref
233 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
234 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
235 sha = *trigger.PushNewSha
236 case workflow.TriggerKindPullRequest:
237 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
238 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
239 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
240 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
241 sha = *trigger.PRSourceSha
242 }
243
244 tx, err := d.Begin()
245 if err != nil {
246 return fmt.Errorf("failed to start txn: %w", err)
247 }
248
249 triggerId, err := db.AddTrigger(tx, trigger)
250 if err != nil {
251 return fmt.Errorf("failed to add trigger entry: %w", err)
252 }
253
254 pipeline := models.Pipeline{
255 Rkey: msg.Rkey,
256 Knot: source.Key(),
257 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
258 RepoName: record.TriggerMetadata.Repo.Repo,
259 TriggerId: int(triggerId),
260 Sha: sha,
261 }
262
263 err = db.AddPipeline(tx, pipeline)
264 if err != nil {
265 return fmt.Errorf("failed to add pipeline: %w", err)
266 }
267
268 err = tx.Commit()
269 if err != nil {
270 return fmt.Errorf("failed to commit txn: %w", err)
271 }
272
273 return nil
274}