1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/api/tangled"
12 "tangled.org/core/appview/cache"
13 "tangled.org/core/appview/config"
14 "tangled.org/core/appview/db"
15 "tangled.org/core/appview/models"
16 ec "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/log"
19 "tangled.org/core/rbac"
20 "tangled.org/core/workflow"
21
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 "github.com/go-git/go-git/v5/plumbing"
24 "github.com/posthog/posthog-go"
25)
26
27func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
28 knots, err := db.GetRegistrations(
29 d,
30 db.FilterIsNot("registered", "null"),
31 )
32 if err != nil {
33 return nil, err
34 }
35
36 srcs := make(map[ec.Source]struct{})
37 for _, k := range knots {
38 s := ec.NewKnotSource(k.Domain)
39 srcs[s] = struct{}{}
40 }
41
42 logger := log.New("knotstream")
43 cache := cache.New(c.Redis.Addr)
44 cursorStore := cursor.NewRedisCursorStore(cache)
45
46 cfg := ec.ConsumerConfig{
47 Sources: srcs,
48 ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev),
49 RetryInterval: c.Knotstream.RetryInterval,
50 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
51 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
52 WorkerCount: c.Knotstream.WorkerCount,
53 QueueSize: c.Knotstream.QueueSize,
54 Logger: logger,
55 Dev: c.Core.Dev,
56 CursorStore: &cursorStore,
57 }
58
59 return ec.NewConsumer(cfg), nil
60}
61
62func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
63 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
64 switch msg.Nsid {
65 case tangled.GitRefUpdateNSID:
66 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
67 case tangled.PipelineNSID:
68 return ingestPipeline(d, source, msg)
69 }
70
71 return nil
72 }
73}
74
75func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
76 var record tangled.GitRefUpdate
77 err := json.Unmarshal(msg.EventJson, &record)
78 if err != nil {
79 return err
80 }
81
82 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
83 if err != nil {
84 return err
85 }
86 if !slices.Contains(knownKnots, source.Key()) {
87 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
88 }
89
90 err1 := populatePunchcard(d, record)
91 err2 := updateRepoLanguages(d, record)
92
93 var err3 error
94 if !dev {
95 err3 = pc.Enqueue(posthog.Capture{
96 DistinctId: record.CommitterDid,
97 Event: "git_ref_update",
98 })
99 }
100
101 return errors.Join(err1, err2, err3)
102}
103
104func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
105 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
106 if err != nil {
107 return err
108 }
109
110 count := 0
111 for _, ke := range knownEmails {
112 if record.Meta == nil {
113 continue
114 }
115 if record.Meta.CommitCount == nil {
116 continue
117 }
118 for _, ce := range record.Meta.CommitCount.ByEmail {
119 if ce == nil {
120 continue
121 }
122 if ce.Email == ke.Address {
123 count += int(ce.Count)
124 }
125 }
126 }
127
128 punch := models.Punch{
129 Did: record.CommitterDid,
130 Date: time.Now(),
131 Count: count,
132 }
133 return db.AddPunch(d, punch)
134}
135
136func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
137 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
138 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
139 }
140
141 repos, err := db.GetRepos(
142 d,
143 0,
144 db.FilterEq("did", record.RepoDid),
145 db.FilterEq("name", record.RepoName),
146 )
147 if err != nil {
148 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
149 }
150 if len(repos) != 1 {
151 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
152 }
153 repo := repos[0]
154
155 ref := plumbing.ReferenceName(record.Ref)
156 if !ref.IsBranch() {
157 return fmt.Errorf("%s is not a valid reference name", ref)
158 }
159
160 var langs []models.RepoLanguage
161 for _, l := range record.Meta.LangBreakdown.Inputs {
162 if l == nil {
163 continue
164 }
165
166 langs = append(langs, models.RepoLanguage{
167 RepoAt: repo.RepoAt(),
168 Ref: ref.Short(),
169 IsDefaultRef: record.Meta.IsDefaultRef,
170 Language: l.Lang,
171 Bytes: l.Size,
172 })
173 }
174
175 tx, err := d.Begin()
176 if err != nil {
177 return err
178 }
179 defer tx.Rollback()
180
181 // update appview's cache
182 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
183 if err != nil {
184 fmt.Printf("failed; %s\n", err)
185 // non-fatal
186 }
187
188 return tx.Commit()
189}
190
191func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
192 var record tangled.Pipeline
193 err := json.Unmarshal(msg.EventJson, &record)
194 if err != nil {
195 return err
196 }
197
198 if record.TriggerMetadata == nil {
199 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
200 }
201
202 if record.TriggerMetadata.Repo == nil {
203 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
204 }
205
206 // does this repo have a spindle configured?
207 repos, err := db.GetRepos(
208 d,
209 0,
210 db.FilterEq("did", record.TriggerMetadata.Repo.Did),
211 db.FilterEq("name", record.TriggerMetadata.Repo.Repo),
212 )
213 if err != nil {
214 return fmt.Errorf("failed to look for repo in DB: nsid %s, rkey %s, %w", msg.Nsid, msg.Rkey, err)
215 }
216 if len(repos) != 1 {
217 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
218 }
219 if repos[0].Spindle == "" {
220 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
221 }
222
223 // trigger info
224 var trigger models.Trigger
225 var sha string
226 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
227 switch trigger.Kind {
228 case workflow.TriggerKindPush:
229 trigger.PushRef = &record.TriggerMetadata.Push.Ref
230 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
231 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
232 sha = *trigger.PushNewSha
233 case workflow.TriggerKindPullRequest:
234 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
235 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
236 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
237 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
238 sha = *trigger.PRSourceSha
239 }
240
241 tx, err := d.Begin()
242 if err != nil {
243 return fmt.Errorf("failed to start txn: %w", err)
244 }
245
246 triggerId, err := db.AddTrigger(tx, trigger)
247 if err != nil {
248 return fmt.Errorf("failed to add trigger entry: %w", err)
249 }
250
251 pipeline := models.Pipeline{
252 Rkey: msg.Rkey,
253 Knot: source.Key(),
254 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
255 RepoName: record.TriggerMetadata.Repo.Repo,
256 TriggerId: int(triggerId),
257 Sha: sha,
258 }
259
260 err = db.AddPipeline(tx, pipeline)
261 if err != nil {
262 return fmt.Errorf("failed to add pipeline: %w", err)
263 }
264
265 err = tx.Commit()
266 if err != nil {
267 return fmt.Errorf("failed to commit txn: %w", err)
268 }
269
270 return nil
271}