1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "tangled.org/core/appview/models"
10)
11
12func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) {
13 var pipelines []models.Pipeline
14
15 var conditions []string
16 var args []any
17 for _, filter := range filters {
18 conditions = append(conditions, filter.Condition())
19 args = append(args, filter.Arg()...)
20 }
21
22 whereClause := ""
23 if conditions != nil {
24 whereClause = " where " + strings.Join(conditions, " and ")
25 }
26
27 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
28
29 rows, err := e.Query(query, args...)
30
31 if err != nil {
32 return nil, err
33 }
34 defer rows.Close()
35
36 for rows.Next() {
37 var pipeline models.Pipeline
38 var createdAt string
39 err = rows.Scan(
40 &pipeline.Id,
41 &pipeline.Rkey,
42 &pipeline.Knot,
43 &pipeline.RepoOwner,
44 &pipeline.RepoName,
45 &pipeline.Sha,
46 &createdAt,
47 )
48 if err != nil {
49 return nil, err
50 }
51
52 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
53 pipeline.Created = t
54 }
55
56 pipelines = append(pipelines, pipeline)
57 }
58
59 if err = rows.Err(); err != nil {
60 return nil, err
61 }
62
63 return pipelines, nil
64}
65
66func AddPipeline(e Execer, pipeline models.Pipeline) error {
67 args := []any{
68 pipeline.Rkey,
69 pipeline.Knot,
70 pipeline.RepoOwner,
71 pipeline.RepoName,
72 pipeline.TriggerId,
73 pipeline.Sha,
74 }
75
76 placeholders := make([]string, len(args))
77 for i := range placeholders {
78 placeholders[i] = "?"
79 }
80
81 query := fmt.Sprintf(`
82 insert or ignore into pipelines (
83 rkey,
84 knot,
85 repo_owner,
86 repo_name,
87 trigger_id,
88 sha
89 ) values (%s)
90 `, strings.Join(placeholders, ","))
91
92 _, err := e.Exec(query, args...)
93
94 return err
95}
96
97func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
98 args := []any{
99 trigger.Kind,
100 trigger.PushRef,
101 trigger.PushNewSha,
102 trigger.PushOldSha,
103 trigger.PRSourceBranch,
104 trigger.PRTargetBranch,
105 trigger.PRSourceSha,
106 trigger.PRAction,
107 }
108
109 placeholders := make([]string, len(args))
110 for i := range placeholders {
111 placeholders[i] = "?"
112 }
113
114 query := fmt.Sprintf(`insert or ignore into triggers (
115 kind,
116 push_ref,
117 push_new_sha,
118 push_old_sha,
119 pr_source_branch,
120 pr_target_branch,
121 pr_source_sha,
122 pr_action
123 ) values (%s)`, strings.Join(placeholders, ","))
124
125 res, err := e.Exec(query, args...)
126 if err != nil {
127 return 0, err
128 }
129
130 return res.LastInsertId()
131}
132
133func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
134 args := []any{
135 status.Spindle,
136 status.Rkey,
137 status.PipelineKnot,
138 status.PipelineRkey,
139 status.Workflow,
140 status.Status,
141 status.Error,
142 status.ExitCode,
143 status.Created.Format(time.RFC3339),
144 }
145
146 placeholders := make([]string, len(args))
147 for i := range placeholders {
148 placeholders[i] = "?"
149 }
150
151 query := fmt.Sprintf(`
152 insert or ignore into pipeline_statuses (
153 spindle,
154 rkey,
155 pipeline_knot,
156 pipeline_rkey,
157 workflow,
158 status,
159 error,
160 exit_code,
161 created
162 ) values (%s)
163 `, strings.Join(placeholders, ","))
164
165 _, err := e.Exec(query, args...)
166 return err
167}
168
169// this is a mega query, but the most useful one:
170// get N pipelines, for each one get the latest status of its N workflows
171func GetPipelineStatuses(e Execer, filters ...filter) ([]models.Pipeline, error) {
172 var conditions []string
173 var args []any
174 for _, filter := range filters {
175 filter.key = "p." + filter.key // the table is aliased in the query to `p`
176 conditions = append(conditions, filter.Condition())
177 args = append(args, filter.Arg()...)
178 }
179
180 whereClause := ""
181 if conditions != nil {
182 whereClause = " where " + strings.Join(conditions, " and ")
183 }
184
185 query := fmt.Sprintf(`
186 select
187 p.id,
188 p.knot,
189 p.rkey,
190 p.repo_owner,
191 p.repo_name,
192 p.sha,
193 p.created,
194 t.id,
195 t.kind,
196 t.push_ref,
197 t.push_new_sha,
198 t.push_old_sha,
199 t.pr_source_branch,
200 t.pr_target_branch,
201 t.pr_source_sha,
202 t.pr_action
203 from
204 pipelines p
205 join
206 triggers t ON p.trigger_id = t.id
207 %s
208 `, whereClause)
209
210 rows, err := e.Query(query, args...)
211 if err != nil {
212 return nil, err
213 }
214 defer rows.Close()
215
216 pipelines := make(map[string]models.Pipeline)
217 for rows.Next() {
218 var p models.Pipeline
219 var t models.Trigger
220 var created string
221
222 err := rows.Scan(
223 &p.Id,
224 &p.Knot,
225 &p.Rkey,
226 &p.RepoOwner,
227 &p.RepoName,
228 &p.Sha,
229 &created,
230 &p.TriggerId,
231 &t.Kind,
232 &t.PushRef,
233 &t.PushNewSha,
234 &t.PushOldSha,
235 &t.PRSourceBranch,
236 &t.PRTargetBranch,
237 &t.PRSourceSha,
238 &t.PRAction,
239 )
240 if err != nil {
241 return nil, err
242 }
243
244 p.Created, err = time.Parse(time.RFC3339, created)
245 if err != nil {
246 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
247 }
248
249 t.Id = p.TriggerId
250 p.Trigger = &t
251 p.Statuses = make(map[string]models.WorkflowStatus)
252
253 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
254 pipelines[k] = p
255 }
256
257 // get all statuses
258 // the where clause here is of the form:
259 //
260 // where (pipeline_knot = k1 and pipeline_rkey = r1)
261 // or (pipeline_knot = k2 and pipeline_rkey = r2)
262 conditions = nil
263 args = nil
264 for _, p := range pipelines {
265 knotFilter := FilterEq("pipeline_knot", p.Knot)
266 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
267 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
268 args = append(args, p.Knot)
269 args = append(args, p.Rkey)
270 }
271 whereClause = ""
272 if conditions != nil {
273 whereClause = "where " + strings.Join(conditions, " or ")
274 }
275 query = fmt.Sprintf(`
276 select
277 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
278 from
279 pipeline_statuses
280 %s
281 `, whereClause)
282
283 rows, err = e.Query(query, args...)
284 if err != nil {
285 return nil, err
286 }
287 defer rows.Close()
288
289 for rows.Next() {
290 var ps models.PipelineStatus
291 var created string
292
293 err := rows.Scan(
294 &ps.ID,
295 &ps.Spindle,
296 &ps.Rkey,
297 &ps.PipelineKnot,
298 &ps.PipelineRkey,
299 &created,
300 &ps.Workflow,
301 &ps.Status,
302 &ps.Error,
303 &ps.ExitCode,
304 )
305 if err != nil {
306 return nil, err
307 }
308
309 ps.Created, err = time.Parse(time.RFC3339, created)
310 if err != nil {
311 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
312 }
313
314 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
315
316 // extract
317 pipeline, ok := pipelines[key]
318 if !ok {
319 continue
320 }
321 statuses, _ := pipeline.Statuses[ps.Workflow]
322 if !ok {
323 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
324 }
325
326 // append
327 statuses.Data = append(statuses.Data, ps)
328
329 // reassign
330 pipeline.Statuses[ps.Workflow] = statuses
331 pipelines[key] = pipeline
332 }
333
334 var all []models.Pipeline
335 for _, p := range pipelines {
336 for _, s := range p.Statuses {
337 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
338 if a.Created.After(b.Created) {
339 return 1
340 }
341 if a.Created.Before(b.Created) {
342 return -1
343 }
344 if a.ID > b.ID {
345 return 1
346 }
347 if a.ID < b.ID {
348 return -1
349 }
350 return 0
351 })
352 }
353 all = append(all, p)
354 }
355
356 // sort pipelines by date
357 slices.SortFunc(all, func(a, b models.Pipeline) int {
358 if a.Created.After(b.Created) {
359 return -1
360 }
361 return 1
362 })
363
364 return all, nil
365}