forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/go-git/go-git/v5/plumbing"
11 spindle "tangled.sh/tangled.sh/core/spindle/models"
12)
13
14type Pipeline struct {
15 Id int
16 Rkey string
17 Knot string
18 RepoOwner syntax.DID
19 RepoName string
20 TriggerId int
21 Sha string
22 Created time.Time
23
24 // populate when querying for reverse mappings
25 Trigger *Trigger
26 Statuses map[string]WorkflowStatus
27}
28
29type WorkflowStatus struct {
30 Data []PipelineStatus
31}
32
33func (w WorkflowStatus) Latest() PipelineStatus {
34 return w.Data[len(w.Data)-1]
35}
36
37// time taken by this workflow to reach an "end state"
38func (w WorkflowStatus) TimeTaken() time.Duration {
39 var start, end *time.Time
40 for _, s := range w.Data {
41 if s.Status.IsStart() {
42 start = &s.Created
43 }
44 if s.Status.IsFinish() {
45 end = &s.Created
46 }
47 }
48
49 if start != nil && end != nil && end.After(*start) {
50 return end.Sub(*start)
51 }
52
53 return 0
54}
55
56func (p Pipeline) Counts() map[string]int {
57 m := make(map[string]int)
58 for _, w := range p.Statuses {
59 m[w.Latest().Status.String()] += 1
60 }
61 return m
62}
63
64func (p Pipeline) TimeTaken() time.Duration {
65 var s time.Duration
66 for _, w := range p.Statuses {
67 s += w.TimeTaken()
68 }
69 return s
70}
71
72func (p Pipeline) Workflows() []string {
73 var ws []string
74 for v := range p.Statuses {
75 ws = append(ws, v)
76 }
77 slices.Sort(ws)
78 return ws
79}
80
81// if we know that a spindle has picked up this pipeline, then it is Responding
82func (p Pipeline) IsResponding() bool {
83 return len(p.Statuses) != 0
84}
85
86type Trigger struct {
87 Id int
88 Kind string
89
90 // push trigger fields
91 PushRef *string
92 PushNewSha *string
93 PushOldSha *string
94
95 // pull request trigger fields
96 PRSourceBranch *string
97 PRTargetBranch *string
98 PRSourceSha *string
99 PRAction *string
100}
101
102func (t *Trigger) IsPush() bool {
103 return t != nil && t.Kind == "push"
104}
105
106func (t *Trigger) IsPullRequest() bool {
107 return t != nil && t.Kind == "pull_request"
108}
109
110func (t *Trigger) TargetRef() string {
111 if t.IsPush() {
112 return plumbing.ReferenceName(*t.PushRef).Short()
113 } else if t.IsPullRequest() {
114 return *t.PRTargetBranch
115 }
116
117 return ""
118}
119
120type PipelineStatus struct {
121 ID int
122 Spindle string
123 Rkey string
124 PipelineKnot string
125 PipelineRkey string
126 Created time.Time
127 Workflow string
128 Status spindle.StatusKind
129 Error *string
130 ExitCode int
131}
132
133func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
134 var pipelines []Pipeline
135
136 var conditions []string
137 var args []any
138 for _, filter := range filters {
139 conditions = append(conditions, filter.Condition())
140 args = append(args, filter.Arg()...)
141 }
142
143 whereClause := ""
144 if conditions != nil {
145 whereClause = " where " + strings.Join(conditions, " and ")
146 }
147
148 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
149
150 rows, err := e.Query(query, args...)
151
152 if err != nil {
153 return nil, err
154 }
155 defer rows.Close()
156
157 for rows.Next() {
158 var pipeline Pipeline
159 var createdAt string
160 err = rows.Scan(
161 &pipeline.Id,
162 &pipeline.Rkey,
163 &pipeline.Knot,
164 &pipeline.RepoOwner,
165 &pipeline.RepoName,
166 &pipeline.Sha,
167 &createdAt,
168 )
169 if err != nil {
170 return nil, err
171 }
172
173 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
174 pipeline.Created = t
175 }
176
177 pipelines = append(pipelines, pipeline)
178 }
179
180 if err = rows.Err(); err != nil {
181 return nil, err
182 }
183
184 return pipelines, nil
185}
186
187func AddPipeline(e Execer, pipeline Pipeline) error {
188 args := []any{
189 pipeline.Rkey,
190 pipeline.Knot,
191 pipeline.RepoOwner,
192 pipeline.RepoName,
193 pipeline.TriggerId,
194 pipeline.Sha,
195 }
196
197 placeholders := make([]string, len(args))
198 for i := range placeholders {
199 placeholders[i] = "?"
200 }
201
202 query := fmt.Sprintf(`
203 insert or ignore into pipelines (
204 rkey,
205 knot,
206 repo_owner,
207 repo_name,
208 trigger_id,
209 sha
210 ) values (%s)
211 `, strings.Join(placeholders, ","))
212
213 _, err := e.Exec(query, args...)
214
215 return err
216}
217
218func AddTrigger(e Execer, trigger Trigger) (int64, error) {
219 args := []any{
220 trigger.Kind,
221 trigger.PushRef,
222 trigger.PushNewSha,
223 trigger.PushOldSha,
224 trigger.PRSourceBranch,
225 trigger.PRTargetBranch,
226 trigger.PRSourceSha,
227 trigger.PRAction,
228 }
229
230 placeholders := make([]string, len(args))
231 for i := range placeholders {
232 placeholders[i] = "?"
233 }
234
235 query := fmt.Sprintf(`insert or ignore into triggers (
236 kind,
237 push_ref,
238 push_new_sha,
239 push_old_sha,
240 pr_source_branch,
241 pr_target_branch,
242 pr_source_sha,
243 pr_action
244 ) values (%s)`, strings.Join(placeholders, ","))
245
246 res, err := e.Exec(query, args...)
247 if err != nil {
248 return 0, err
249 }
250
251 return res.LastInsertId()
252}
253
254func AddPipelineStatus(e Execer, status PipelineStatus) error {
255 args := []any{
256 status.Spindle,
257 status.Rkey,
258 status.PipelineKnot,
259 status.PipelineRkey,
260 status.Workflow,
261 status.Status,
262 status.Error,
263 status.ExitCode,
264 status.Created.Format(time.RFC3339),
265 }
266
267 placeholders := make([]string, len(args))
268 for i := range placeholders {
269 placeholders[i] = "?"
270 }
271
272 query := fmt.Sprintf(`
273 insert or ignore into pipeline_statuses (
274 spindle,
275 rkey,
276 pipeline_knot,
277 pipeline_rkey,
278 workflow,
279 status,
280 error,
281 exit_code,
282 created
283 ) values (%s)
284 `, strings.Join(placeholders, ","))
285
286 _, err := e.Exec(query, args...)
287 return err
288}
289
290// this is a mega query, but the most useful one:
291// get N pipelines, for each one get the latest status of its N workflows
292func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
293 var conditions []string
294 var args []any
295 for _, filter := range filters {
296 filter.key = "p." + filter.key // the table is aliased in the query to `p`
297 conditions = append(conditions, filter.Condition())
298 args = append(args, filter.Arg()...)
299 }
300
301 whereClause := ""
302 if conditions != nil {
303 whereClause = " where " + strings.Join(conditions, " and ")
304 }
305
306 query := fmt.Sprintf(`
307 select
308 p.id,
309 p.knot,
310 p.rkey,
311 p.repo_owner,
312 p.repo_name,
313 p.sha,
314 p.created,
315 t.id,
316 t.kind,
317 t.push_ref,
318 t.push_new_sha,
319 t.push_old_sha,
320 t.pr_source_branch,
321 t.pr_target_branch,
322 t.pr_source_sha,
323 t.pr_action
324 from
325 pipelines p
326 join
327 triggers t ON p.trigger_id = t.id
328 %s
329 `, whereClause)
330
331 rows, err := e.Query(query, args...)
332 if err != nil {
333 return nil, err
334 }
335 defer rows.Close()
336
337 pipelines := make(map[string]Pipeline)
338 for rows.Next() {
339 var p Pipeline
340 var t Trigger
341 var created string
342
343 err := rows.Scan(
344 &p.Id,
345 &p.Knot,
346 &p.Rkey,
347 &p.RepoOwner,
348 &p.RepoName,
349 &p.Sha,
350 &created,
351 &p.TriggerId,
352 &t.Kind,
353 &t.PushRef,
354 &t.PushNewSha,
355 &t.PushOldSha,
356 &t.PRSourceBranch,
357 &t.PRTargetBranch,
358 &t.PRSourceSha,
359 &t.PRAction,
360 )
361 if err != nil {
362 return nil, err
363 }
364
365 p.Created, err = time.Parse(time.RFC3339, created)
366 if err != nil {
367 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
368 }
369
370 t.Id = p.TriggerId
371 p.Trigger = &t
372 p.Statuses = make(map[string]WorkflowStatus)
373
374 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
375 pipelines[k] = p
376 }
377
378 // get all statuses
379 // the where clause here is of the form:
380 //
381 // where (pipeline_knot = k1 and pipeline_rkey = r1)
382 // or (pipeline_knot = k2 and pipeline_rkey = r2)
383 conditions = nil
384 args = nil
385 for _, p := range pipelines {
386 knotFilter := FilterEq("pipeline_knot", p.Knot)
387 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
388 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
389 args = append(args, p.Knot)
390 args = append(args, p.Rkey)
391 }
392 whereClause = ""
393 if conditions != nil {
394 whereClause = "where " + strings.Join(conditions, " or ")
395 }
396 query = fmt.Sprintf(`
397 select
398 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
399 from
400 pipeline_statuses
401 %s
402 `, whereClause)
403
404 rows, err = e.Query(query, args...)
405 if err != nil {
406 return nil, err
407 }
408 defer rows.Close()
409
410 for rows.Next() {
411 var ps PipelineStatus
412 var created string
413
414 err := rows.Scan(
415 &ps.ID,
416 &ps.Spindle,
417 &ps.Rkey,
418 &ps.PipelineKnot,
419 &ps.PipelineRkey,
420 &created,
421 &ps.Workflow,
422 &ps.Status,
423 &ps.Error,
424 &ps.ExitCode,
425 )
426 if err != nil {
427 return nil, err
428 }
429
430 ps.Created, err = time.Parse(time.RFC3339, created)
431 if err != nil {
432 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
433 }
434
435 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
436
437 // extract
438 pipeline, ok := pipelines[key]
439 if !ok {
440 continue
441 }
442 statuses, _ := pipeline.Statuses[ps.Workflow]
443 if !ok {
444 pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
445 }
446
447 // append
448 statuses.Data = append(statuses.Data, ps)
449
450 // reassign
451 pipeline.Statuses[ps.Workflow] = statuses
452 pipelines[key] = pipeline
453 }
454
455 var all []Pipeline
456 for _, p := range pipelines {
457 for _, s := range p.Statuses {
458 slices.SortFunc(s.Data, func(a, b PipelineStatus) int {
459 if a.Created.After(b.Created) {
460 return 1
461 }
462 if a.Created.Before(b.Created) {
463 return -1
464 }
465 if a.ID > b.ID {
466 return 1
467 }
468 if a.ID < b.ID {
469 return -1
470 }
471 return 0
472 })
473 }
474 all = append(all, p)
475 }
476
477 // sort pipelines by date
478 slices.SortFunc(all, func(a, b Pipeline) int {
479 if a.Created.After(b.Created) {
480 return -1
481 }
482 return 1
483 })
484
485 return all, nil
486}