1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/go-git/go-git/v5/plumbing"
11 spindle "tangled.sh/tangled.sh/core/spindle/models"
12 "tangled.sh/tangled.sh/core/workflow"
13)
14
15type Pipeline struct {
16 Id int
17 Rkey string
18 Knot string
19 RepoOwner syntax.DID
20 RepoName string
21 TriggerId int
22 Sha string
23 Created time.Time
24
25 // populate when querying for reverse mappings
26 Trigger *Trigger
27 Statuses map[string]WorkflowStatus
28}
29
30type WorkflowStatus struct {
31 Data []PipelineStatus
32}
33
34func (w WorkflowStatus) Latest() PipelineStatus {
35 return w.Data[len(w.Data)-1]
36}
37
38// time taken by this workflow to reach an "end state"
39func (w WorkflowStatus) TimeTaken() time.Duration {
40 var start, end *time.Time
41 for _, s := range w.Data {
42 if s.Status.IsStart() {
43 start = &s.Created
44 }
45 if s.Status.IsFinish() {
46 end = &s.Created
47 }
48 }
49
50 if start != nil && end != nil && end.After(*start) {
51 return end.Sub(*start)
52 }
53
54 return 0
55}
56
57func (p Pipeline) Counts() map[string]int {
58 m := make(map[string]int)
59 for _, w := range p.Statuses {
60 m[w.Latest().Status.String()] += 1
61 }
62 return m
63}
64
65func (p Pipeline) TimeTaken() time.Duration {
66 var s time.Duration
67 for _, w := range p.Statuses {
68 s += w.TimeTaken()
69 }
70 return s
71}
72
73func (p Pipeline) Workflows() []string {
74 var ws []string
75 for v := range p.Statuses {
76 ws = append(ws, v)
77 }
78 slices.Sort(ws)
79 return ws
80}
81
82// if we know that a spindle has picked up this pipeline, then it is Responding
83func (p Pipeline) IsResponding() bool {
84 return len(p.Statuses) != 0
85}
86
87type Trigger struct {
88 Id int
89 Kind workflow.TriggerKind
90
91 // push trigger fields
92 PushRef *string
93 PushNewSha *string
94 PushOldSha *string
95
96 // pull request trigger fields
97 PRSourceBranch *string
98 PRTargetBranch *string
99 PRSourceSha *string
100 PRAction *string
101}
102
103func (t *Trigger) IsPush() bool {
104 return t != nil && t.Kind == workflow.TriggerKindPush
105}
106
107func (t *Trigger) IsPullRequest() bool {
108 return t != nil && t.Kind == workflow.TriggerKindPullRequest
109}
110
111func (t *Trigger) TargetRef() string {
112 if t.IsPush() {
113 return plumbing.ReferenceName(*t.PushRef).Short()
114 } else if t.IsPullRequest() {
115 return *t.PRTargetBranch
116 }
117
118 return ""
119}
120
121type PipelineStatus struct {
122 ID int
123 Spindle string
124 Rkey string
125 PipelineKnot string
126 PipelineRkey string
127 Created time.Time
128 Workflow string
129 Status spindle.StatusKind
130 Error *string
131 ExitCode int
132}
133
134func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
135 var pipelines []Pipeline
136
137 var conditions []string
138 var args []any
139 for _, filter := range filters {
140 conditions = append(conditions, filter.Condition())
141 args = append(args, filter.Arg()...)
142 }
143
144 whereClause := ""
145 if conditions != nil {
146 whereClause = " where " + strings.Join(conditions, " and ")
147 }
148
149 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
150
151 rows, err := e.Query(query, args...)
152
153 if err != nil {
154 return nil, err
155 }
156 defer rows.Close()
157
158 for rows.Next() {
159 var pipeline Pipeline
160 var createdAt string
161 err = rows.Scan(
162 &pipeline.Id,
163 &pipeline.Rkey,
164 &pipeline.Knot,
165 &pipeline.RepoOwner,
166 &pipeline.RepoName,
167 &pipeline.Sha,
168 &createdAt,
169 )
170 if err != nil {
171 return nil, err
172 }
173
174 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
175 pipeline.Created = t
176 }
177
178 pipelines = append(pipelines, pipeline)
179 }
180
181 if err = rows.Err(); err != nil {
182 return nil, err
183 }
184
185 return pipelines, nil
186}
187
188func AddPipeline(e Execer, pipeline Pipeline) error {
189 args := []any{
190 pipeline.Rkey,
191 pipeline.Knot,
192 pipeline.RepoOwner,
193 pipeline.RepoName,
194 pipeline.TriggerId,
195 pipeline.Sha,
196 }
197
198 placeholders := make([]string, len(args))
199 for i := range placeholders {
200 placeholders[i] = "?"
201 }
202
203 query := fmt.Sprintf(`
204 insert or ignore into pipelines (
205 rkey,
206 knot,
207 repo_owner,
208 repo_name,
209 trigger_id,
210 sha
211 ) values (%s)
212 `, strings.Join(placeholders, ","))
213
214 _, err := e.Exec(query, args...)
215
216 return err
217}
218
219func AddTrigger(e Execer, trigger Trigger) (int64, error) {
220 args := []any{
221 trigger.Kind,
222 trigger.PushRef,
223 trigger.PushNewSha,
224 trigger.PushOldSha,
225 trigger.PRSourceBranch,
226 trigger.PRTargetBranch,
227 trigger.PRSourceSha,
228 trigger.PRAction,
229 }
230
231 placeholders := make([]string, len(args))
232 for i := range placeholders {
233 placeholders[i] = "?"
234 }
235
236 query := fmt.Sprintf(`insert or ignore into triggers (
237 kind,
238 push_ref,
239 push_new_sha,
240 push_old_sha,
241 pr_source_branch,
242 pr_target_branch,
243 pr_source_sha,
244 pr_action
245 ) values (%s)`, strings.Join(placeholders, ","))
246
247 res, err := e.Exec(query, args...)
248 if err != nil {
249 return 0, err
250 }
251
252 return res.LastInsertId()
253}
254
255func AddPipelineStatus(e Execer, status PipelineStatus) error {
256 args := []any{
257 status.Spindle,
258 status.Rkey,
259 status.PipelineKnot,
260 status.PipelineRkey,
261 status.Workflow,
262 status.Status,
263 status.Error,
264 status.ExitCode,
265 status.Created.Format(time.RFC3339),
266 }
267
268 placeholders := make([]string, len(args))
269 for i := range placeholders {
270 placeholders[i] = "?"
271 }
272
273 query := fmt.Sprintf(`
274 insert or ignore into pipeline_statuses (
275 spindle,
276 rkey,
277 pipeline_knot,
278 pipeline_rkey,
279 workflow,
280 status,
281 error,
282 exit_code,
283 created
284 ) values (%s)
285 `, strings.Join(placeholders, ","))
286
287 _, err := e.Exec(query, args...)
288 return err
289}
290
291// this is a mega query, but the most useful one:
292// get N pipelines, for each one get the latest status of its N workflows
293func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
294 var conditions []string
295 var args []any
296 for _, filter := range filters {
297 filter.key = "p." + filter.key // the table is aliased in the query to `p`
298 conditions = append(conditions, filter.Condition())
299 args = append(args, filter.Arg()...)
300 }
301
302 whereClause := ""
303 if conditions != nil {
304 whereClause = " where " + strings.Join(conditions, " and ")
305 }
306
307 query := fmt.Sprintf(`
308 select
309 p.id,
310 p.knot,
311 p.rkey,
312 p.repo_owner,
313 p.repo_name,
314 p.sha,
315 p.created,
316 t.id,
317 t.kind,
318 t.push_ref,
319 t.push_new_sha,
320 t.push_old_sha,
321 t.pr_source_branch,
322 t.pr_target_branch,
323 t.pr_source_sha,
324 t.pr_action
325 from
326 pipelines p
327 join
328 triggers t ON p.trigger_id = t.id
329 %s
330 `, whereClause)
331
332 rows, err := e.Query(query, args...)
333 if err != nil {
334 return nil, err
335 }
336 defer rows.Close()
337
338 pipelines := make(map[string]Pipeline)
339 for rows.Next() {
340 var p Pipeline
341 var t Trigger
342 var created string
343
344 err := rows.Scan(
345 &p.Id,
346 &p.Knot,
347 &p.Rkey,
348 &p.RepoOwner,
349 &p.RepoName,
350 &p.Sha,
351 &created,
352 &p.TriggerId,
353 &t.Kind,
354 &t.PushRef,
355 &t.PushNewSha,
356 &t.PushOldSha,
357 &t.PRSourceBranch,
358 &t.PRTargetBranch,
359 &t.PRSourceSha,
360 &t.PRAction,
361 )
362 if err != nil {
363 return nil, err
364 }
365
366 p.Created, err = time.Parse(time.RFC3339, created)
367 if err != nil {
368 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
369 }
370
371 t.Id = p.TriggerId
372 p.Trigger = &t
373 p.Statuses = make(map[string]WorkflowStatus)
374
375 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
376 pipelines[k] = p
377 }
378
379 // get all statuses
380 // the where clause here is of the form:
381 //
382 // where (pipeline_knot = k1 and pipeline_rkey = r1)
383 // or (pipeline_knot = k2 and pipeline_rkey = r2)
384 conditions = nil
385 args = nil
386 for _, p := range pipelines {
387 knotFilter := FilterEq("pipeline_knot", p.Knot)
388 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
389 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
390 args = append(args, p.Knot)
391 args = append(args, p.Rkey)
392 }
393 whereClause = ""
394 if conditions != nil {
395 whereClause = "where " + strings.Join(conditions, " or ")
396 }
397 query = fmt.Sprintf(`
398 select
399 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
400 from
401 pipeline_statuses
402 %s
403 `, whereClause)
404
405 rows, err = e.Query(query, args...)
406 if err != nil {
407 return nil, err
408 }
409 defer rows.Close()
410
411 for rows.Next() {
412 var ps PipelineStatus
413 var created string
414
415 err := rows.Scan(
416 &ps.ID,
417 &ps.Spindle,
418 &ps.Rkey,
419 &ps.PipelineKnot,
420 &ps.PipelineRkey,
421 &created,
422 &ps.Workflow,
423 &ps.Status,
424 &ps.Error,
425 &ps.ExitCode,
426 )
427 if err != nil {
428 return nil, err
429 }
430
431 ps.Created, err = time.Parse(time.RFC3339, created)
432 if err != nil {
433 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
434 }
435
436 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
437
438 // extract
439 pipeline, ok := pipelines[key]
440 if !ok {
441 continue
442 }
443 statuses, _ := pipeline.Statuses[ps.Workflow]
444 if !ok {
445 pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
446 }
447
448 // append
449 statuses.Data = append(statuses.Data, ps)
450
451 // reassign
452 pipeline.Statuses[ps.Workflow] = statuses
453 pipelines[key] = pipeline
454 }
455
456 var all []Pipeline
457 for _, p := range pipelines {
458 for _, s := range p.Statuses {
459 slices.SortFunc(s.Data, func(a, b PipelineStatus) int {
460 if a.Created.After(b.Created) {
461 return 1
462 }
463 if a.Created.Before(b.Created) {
464 return -1
465 }
466 if a.ID > b.ID {
467 return 1
468 }
469 if a.ID < b.ID {
470 return -1
471 }
472 return 0
473 })
474 }
475 all = append(all, p)
476 }
477
478 // sort pipelines by date
479 slices.SortFunc(all, func(a, b Pipeline) int {
480 if a.Created.After(b.Created) {
481 return -1
482 }
483 return 1
484 })
485
486 return all, nil
487}