1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/go-git/go-git/v5/plumbing"
11 spindle "tangled.sh/tangled.sh/core/spindle/models"
12)
13
14type Pipeline struct {
15 Id int
16 Rkey string
17 Knot string
18 RepoOwner syntax.DID
19 RepoName string
20 TriggerId int
21 Sha string
22 Created time.Time
23
24 // populate when querying for reverse mappings
25 Trigger *Trigger
26 Statuses map[string]WorkflowStatus
27}
28
29type WorkflowStatus struct {
30 data []PipelineStatus
31}
32
33func (w WorkflowStatus) Latest() PipelineStatus {
34 return w.data[len(w.data)-1]
35}
36
37// time taken by this workflow to reach an "end state"
38func (w WorkflowStatus) TimeTaken() time.Duration {
39 var start, end *time.Time
40 for _, s := range w.data {
41 if s.Status.IsStart() {
42 start = &s.Created
43 }
44 if s.Status.IsFinish() {
45 end = &s.Created
46 }
47 }
48
49 if start != nil && end != nil && end.After(*start) {
50 return end.Sub(*start)
51 }
52
53 return 0
54}
55
56func (p Pipeline) Counts() map[string]int {
57 m := make(map[string]int)
58 for _, w := range p.Statuses {
59 m[w.Latest().Status.String()] += 1
60 }
61 return m
62}
63
64func (p Pipeline) TimeTaken() time.Duration {
65 var s time.Duration
66 for _, w := range p.Statuses {
67 s += w.TimeTaken()
68 }
69 return s
70}
71
72func (p Pipeline) Workflows() []string {
73 var ws []string
74 for v := range p.Statuses {
75 ws = append(ws, v)
76 }
77 slices.Sort(ws)
78 return ws
79}
80
81type Trigger struct {
82 Id int
83 Kind string
84
85 // push trigger fields
86 PushRef *string
87 PushNewSha *string
88 PushOldSha *string
89
90 // pull request trigger fields
91 PRSourceBranch *string
92 PRTargetBranch *string
93 PRSourceSha *string
94 PRAction *string
95}
96
97func (t *Trigger) IsPush() bool {
98 return t != nil && t.Kind == "push"
99}
100
101func (t *Trigger) IsPullRequest() bool {
102 return t != nil && t.Kind == "pull_request"
103}
104
105func (t *Trigger) TargetRef() string {
106 if t.IsPush() {
107 return plumbing.ReferenceName(*t.PushRef).Short()
108 } else if t.IsPullRequest() {
109 return *t.PRTargetBranch
110 }
111
112 return ""
113}
114
115type PipelineStatus struct {
116 ID int
117 Spindle string
118 Rkey string
119 PipelineKnot string
120 PipelineRkey string
121 Created time.Time
122 Workflow string
123 Status spindle.StatusKind
124 Error *string
125 ExitCode int
126}
127
128func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) {
129 var pipelines []Pipeline
130
131 var conditions []string
132 var args []any
133 for _, filter := range filters {
134 conditions = append(conditions, filter.Condition())
135 args = append(args, filter.Arg()...)
136 }
137
138 whereClause := ""
139 if conditions != nil {
140 whereClause = " where " + strings.Join(conditions, " and ")
141 }
142
143 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
144
145 rows, err := e.Query(query, args...)
146
147 if err != nil {
148 return nil, err
149 }
150 defer rows.Close()
151
152 for rows.Next() {
153 var pipeline Pipeline
154 var createdAt string
155 err = rows.Scan(
156 &pipeline.Id,
157 &pipeline.Rkey,
158 &pipeline.Knot,
159 &pipeline.RepoOwner,
160 &pipeline.RepoName,
161 &pipeline.Sha,
162 &createdAt,
163 )
164 if err != nil {
165 return nil, err
166 }
167
168 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
169 pipeline.Created = t
170 }
171
172 pipelines = append(pipelines, pipeline)
173 }
174
175 if err = rows.Err(); err != nil {
176 return nil, err
177 }
178
179 return pipelines, nil
180}
181
182func AddPipeline(e Execer, pipeline Pipeline) error {
183 args := []any{
184 pipeline.Rkey,
185 pipeline.Knot,
186 pipeline.RepoOwner,
187 pipeline.RepoName,
188 pipeline.TriggerId,
189 pipeline.Sha,
190 }
191
192 placeholders := make([]string, len(args))
193 for i := range placeholders {
194 placeholders[i] = "?"
195 }
196
197 query := fmt.Sprintf(`
198 insert or ignore into pipelines (
199 rkey,
200 knot,
201 repo_owner,
202 repo_name,
203 trigger_id,
204 sha
205 ) values (%s)
206 `, strings.Join(placeholders, ","))
207
208 _, err := e.Exec(query, args...)
209
210 return err
211}
212
213func AddTrigger(e Execer, trigger Trigger) (int64, error) {
214 args := []any{
215 trigger.Kind,
216 trigger.PushRef,
217 trigger.PushNewSha,
218 trigger.PushOldSha,
219 trigger.PRSourceBranch,
220 trigger.PRTargetBranch,
221 trigger.PRSourceSha,
222 trigger.PRAction,
223 }
224
225 placeholders := make([]string, len(args))
226 for i := range placeholders {
227 placeholders[i] = "?"
228 }
229
230 query := fmt.Sprintf(`insert or ignore into triggers (
231 kind,
232 push_ref,
233 push_new_sha,
234 push_old_sha,
235 pr_source_branch,
236 pr_target_branch,
237 pr_source_sha,
238 pr_action
239 ) values (%s)`, strings.Join(placeholders, ","))
240
241 res, err := e.Exec(query, args...)
242 if err != nil {
243 return 0, err
244 }
245
246 return res.LastInsertId()
247}
248
249func AddPipelineStatus(e Execer, status PipelineStatus) error {
250 args := []any{
251 status.Spindle,
252 status.Rkey,
253 status.PipelineKnot,
254 status.PipelineRkey,
255 status.Workflow,
256 status.Status,
257 status.Error,
258 status.ExitCode,
259 }
260
261 placeholders := make([]string, len(args))
262 for i := range placeholders {
263 placeholders[i] = "?"
264 }
265
266 query := fmt.Sprintf(`
267 insert or ignore into pipeline_statuses (
268 spindle,
269 rkey,
270 pipeline_knot,
271 pipeline_rkey,
272 workflow,
273 status,
274 error,
275 exit_code
276 ) values (%s)
277 `, strings.Join(placeholders, ","))
278
279 _, err := e.Exec(query, args...)
280 return err
281}
282
283// this is a mega query, but the most useful one:
284// get N pipelines, for each one get the latest status of its N workflows
285func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
286 var conditions []string
287 var args []any
288 for _, filter := range filters {
289 filter.key = "p." + filter.key // the table is aliased in the query to `p`
290 conditions = append(conditions, filter.Condition())
291 args = append(args, filter.Arg()...)
292 }
293
294 whereClause := ""
295 if conditions != nil {
296 whereClause = " where " + strings.Join(conditions, " and ")
297 }
298
299 query := fmt.Sprintf(`
300 select
301 p.id,
302 p.knot,
303 p.rkey,
304 p.repo_owner,
305 p.repo_name,
306 p.sha,
307 p.created,
308 t.id,
309 t.kind,
310 t.push_ref,
311 t.push_new_sha,
312 t.push_old_sha,
313 t.pr_source_branch,
314 t.pr_target_branch,
315 t.pr_source_sha,
316 t.pr_action
317 from
318 pipelines p
319 join
320 triggers t ON p.trigger_id = t.id
321 %s
322 `, whereClause)
323
324 rows, err := e.Query(query, args...)
325 if err != nil {
326 return nil, err
327 }
328 defer rows.Close()
329
330 pipelines := make(map[string]Pipeline)
331 for rows.Next() {
332 var p Pipeline
333 var t Trigger
334 var created string
335
336 err := rows.Scan(
337 &p.Id,
338 &p.Knot,
339 &p.Rkey,
340 &p.RepoOwner,
341 &p.RepoName,
342 &p.Sha,
343 &created,
344 &p.TriggerId,
345 &t.Kind,
346 &t.PushRef,
347 &t.PushNewSha,
348 &t.PushOldSha,
349 &t.PRSourceBranch,
350 &t.PRTargetBranch,
351 &t.PRSourceSha,
352 &t.PRAction,
353 )
354 if err != nil {
355 return nil, err
356 }
357
358 // Parse created time manually
359 p.Created, err = time.Parse(time.RFC3339, created)
360 if err != nil {
361 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
362 }
363
364 // Link trigger to pipeline
365 t.Id = p.TriggerId
366 p.Trigger = &t
367 p.Statuses = make(map[string]WorkflowStatus)
368
369 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
370 pipelines[k] = p
371 }
372
373 // get all statuses
374 // the where clause here is of the form:
375 //
376 // where (pipeline_knot = k1 and pipeline_rkey = r1)
377 // or (pipeline_knot = k2 and pipeline_rkey = r2)
378 conditions = nil
379 args = nil
380 for _, p := range pipelines {
381 knotFilter := FilterEq("pipeline_knot", p.Knot)
382 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
383 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
384 args = append(args, p.Knot)
385 args = append(args, p.Rkey)
386 }
387 whereClause = ""
388 if conditions != nil {
389 whereClause = "where " + strings.Join(conditions, " or ")
390 }
391 query = fmt.Sprintf(`
392 select
393 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
394 from
395 pipeline_statuses
396 %s
397 `, whereClause)
398
399 rows, err = e.Query(query, args...)
400 if err != nil {
401 return nil, err
402 }
403 defer rows.Close()
404
405 for rows.Next() {
406 var ps PipelineStatus
407 var created string
408
409 err := rows.Scan(
410 &ps.ID,
411 &ps.Spindle,
412 &ps.Rkey,
413 &ps.PipelineKnot,
414 &ps.PipelineRkey,
415 &created,
416 &ps.Workflow,
417 &ps.Status,
418 &ps.Error,
419 &ps.ExitCode,
420 )
421 if err != nil {
422 return nil, err
423 }
424
425 ps.Created, err = time.Parse(time.RFC3339, created)
426 if err != nil {
427 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
428 }
429
430 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
431
432 // extract
433 pipeline, ok := pipelines[key]
434 if !ok {
435 continue
436 }
437 statuses, _ := pipeline.Statuses[ps.Workflow]
438 if !ok {
439 pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
440 }
441
442 // append
443 statuses.data = append(statuses.data, ps)
444
445 // reassign
446 pipeline.Statuses[ps.Workflow] = statuses
447 pipelines[key] = pipeline
448 }
449
450 var all []Pipeline
451 for _, p := range pipelines {
452 for _, s := range p.Statuses {
453 slices.SortFunc(s.data, func(a, b PipelineStatus) int {
454 if a.Created.After(b.Created) {
455 return 1
456 }
457 return -1
458 })
459 }
460 all = append(all, p)
461 }
462
463 // sort pipelines by date
464 slices.SortFunc(all, func(a, b Pipeline) int {
465 if a.Created.After(b.Created) {
466 return -1
467 }
468 return 1
469 })
470
471 return all, nil
472}