forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "tangled.org/core/appview/models"
10 "tangled.org/core/orm"
11)
12
13func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) {
14 var pipelines []models.Pipeline
15
16 var conditions []string
17 var args []any
18 for _, filter := range filters {
19 conditions = append(conditions, filter.Condition())
20 args = append(args, filter.Arg()...)
21 }
22
23 whereClause := ""
24 if conditions != nil {
25 whereClause = " where " + strings.Join(conditions, " and ")
26 }
27
28 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
29
30 rows, err := e.Query(query, args...)
31
32 if err != nil {
33 return nil, err
34 }
35 defer rows.Close()
36
37 for rows.Next() {
38 var pipeline models.Pipeline
39 var createdAt string
40 err = rows.Scan(
41 &pipeline.Id,
42 &pipeline.Rkey,
43 &pipeline.Knot,
44 &pipeline.RepoOwner,
45 &pipeline.RepoName,
46 &pipeline.Sha,
47 &createdAt,
48 )
49 if err != nil {
50 return nil, err
51 }
52
53 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
54 pipeline.Created = t
55 }
56
57 pipelines = append(pipelines, pipeline)
58 }
59
60 if err = rows.Err(); err != nil {
61 return nil, err
62 }
63
64 return pipelines, nil
65}
66
67func AddPipeline(e Execer, pipeline models.Pipeline) error {
68 args := []any{
69 pipeline.Rkey,
70 pipeline.Knot,
71 pipeline.RepoOwner,
72 pipeline.RepoName,
73 pipeline.TriggerId,
74 pipeline.Sha,
75 }
76
77 placeholders := make([]string, len(args))
78 for i := range placeholders {
79 placeholders[i] = "?"
80 }
81
82 query := fmt.Sprintf(`
83 insert or ignore into pipelines (
84 rkey,
85 knot,
86 repo_owner,
87 repo_name,
88 trigger_id,
89 sha
90 ) values (%s)
91 `, strings.Join(placeholders, ","))
92
93 _, err := e.Exec(query, args...)
94
95 return err
96}
97
98func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
99 args := []any{
100 trigger.Kind,
101 trigger.PushRef,
102 trigger.PushNewSha,
103 trigger.PushOldSha,
104 trigger.PRSourceBranch,
105 trigger.PRTargetBranch,
106 trigger.PRSourceSha,
107 trigger.PRAction,
108 }
109
110 placeholders := make([]string, len(args))
111 for i := range placeholders {
112 placeholders[i] = "?"
113 }
114
115 query := fmt.Sprintf(`insert or ignore into triggers (
116 kind,
117 push_ref,
118 push_new_sha,
119 push_old_sha,
120 pr_source_branch,
121 pr_target_branch,
122 pr_source_sha,
123 pr_action
124 ) values (%s)`, strings.Join(placeholders, ","))
125
126 res, err := e.Exec(query, args...)
127 if err != nil {
128 return 0, err
129 }
130
131 return res.LastInsertId()
132}
133
134func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
135 args := []any{
136 status.Spindle,
137 status.Rkey,
138 status.PipelineKnot,
139 status.PipelineRkey,
140 status.Workflow,
141 status.Status,
142 status.Error,
143 status.ExitCode,
144 status.Created.Format(time.RFC3339),
145 }
146
147 placeholders := make([]string, len(args))
148 for i := range placeholders {
149 placeholders[i] = "?"
150 }
151
152 query := fmt.Sprintf(`
153 insert or ignore into pipeline_statuses (
154 spindle,
155 rkey,
156 pipeline_knot,
157 pipeline_rkey,
158 workflow,
159 status,
160 error,
161 exit_code,
162 created
163 ) values (%s)
164 `, strings.Join(placeholders, ","))
165
166 _, err := e.Exec(query, args...)
167 return err
168}
169
170// this is a mega query, but the most useful one:
171// get N pipelines, for each one get the latest status of its N workflows
172func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) {
173 var conditions []string
174 var args []any
175 for _, filter := range filters {
176 filter.Key = "p." + filter.Key // the table is aliased in the query to `p`
177 conditions = append(conditions, filter.Condition())
178 args = append(args, filter.Arg()...)
179 }
180
181 whereClause := ""
182 if conditions != nil {
183 whereClause = " where " + strings.Join(conditions, " and ")
184 }
185
186 query := fmt.Sprintf(`
187 select
188 p.id,
189 p.knot,
190 p.rkey,
191 p.repo_owner,
192 p.repo_name,
193 p.sha,
194 p.created,
195 t.id,
196 t.kind,
197 t.push_ref,
198 t.push_new_sha,
199 t.push_old_sha,
200 t.pr_source_branch,
201 t.pr_target_branch,
202 t.pr_source_sha,
203 t.pr_action
204 from
205 pipelines p
206 join
207 triggers t ON p.trigger_id = t.id
208 %s
209 order by p.created desc
210 limit %d
211 `, whereClause, limit)
212
213 rows, err := e.Query(query, args...)
214 if err != nil {
215 return nil, err
216 }
217 defer rows.Close()
218
219 pipelines := make(map[string]models.Pipeline)
220 for rows.Next() {
221 var p models.Pipeline
222 var t models.Trigger
223 var created string
224
225 err := rows.Scan(
226 &p.Id,
227 &p.Knot,
228 &p.Rkey,
229 &p.RepoOwner,
230 &p.RepoName,
231 &p.Sha,
232 &created,
233 &p.TriggerId,
234 &t.Kind,
235 &t.PushRef,
236 &t.PushNewSha,
237 &t.PushOldSha,
238 &t.PRSourceBranch,
239 &t.PRTargetBranch,
240 &t.PRSourceSha,
241 &t.PRAction,
242 )
243 if err != nil {
244 return nil, err
245 }
246
247 p.Created, err = time.Parse(time.RFC3339, created)
248 if err != nil {
249 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
250 }
251
252 t.Id = p.TriggerId
253 p.Trigger = &t
254 p.Statuses = make(map[string]models.WorkflowStatus)
255
256 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
257 pipelines[k] = p
258 }
259
260 // get all statuses
261 // the where clause here is of the form:
262 //
263 // where (pipeline_knot = k1 and pipeline_rkey = r1)
264 // or (pipeline_knot = k2 and pipeline_rkey = r2)
265 conditions = nil
266 args = nil
267 for _, p := range pipelines {
268 knotFilter := orm.FilterEq("pipeline_knot", p.Knot)
269 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey)
270 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
271 args = append(args, p.Knot)
272 args = append(args, p.Rkey)
273 }
274 whereClause = ""
275 if conditions != nil {
276 whereClause = "where " + strings.Join(conditions, " or ")
277 }
278 query = fmt.Sprintf(`
279 select
280 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
281 from
282 pipeline_statuses
283 %s
284 `, whereClause)
285
286 rows, err = e.Query(query, args...)
287 if err != nil {
288 return nil, err
289 }
290 defer rows.Close()
291
292 for rows.Next() {
293 var ps models.PipelineStatus
294 var created string
295
296 err := rows.Scan(
297 &ps.ID,
298 &ps.Spindle,
299 &ps.Rkey,
300 &ps.PipelineKnot,
301 &ps.PipelineRkey,
302 &created,
303 &ps.Workflow,
304 &ps.Status,
305 &ps.Error,
306 &ps.ExitCode,
307 )
308 if err != nil {
309 return nil, err
310 }
311
312 ps.Created, err = time.Parse(time.RFC3339, created)
313 if err != nil {
314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
315 }
316
317 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
318
319 // extract
320 pipeline, ok := pipelines[key]
321 if !ok {
322 continue
323 }
324 statuses, _ := pipeline.Statuses[ps.Workflow]
325 if !ok {
326 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
327 }
328
329 // append
330 statuses.Data = append(statuses.Data, ps)
331
332 // reassign
333 pipeline.Statuses[ps.Workflow] = statuses
334 pipelines[key] = pipeline
335 }
336
337 var all []models.Pipeline
338 for _, p := range pipelines {
339 for _, s := range p.Statuses {
340 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
341 if a.Created.After(b.Created) {
342 return 1
343 }
344 if a.Created.Before(b.Created) {
345 return -1
346 }
347 if a.ID > b.ID {
348 return 1
349 }
350 if a.ID < b.ID {
351 return -1
352 }
353 return 0
354 })
355 }
356 all = append(all, p)
357 }
358
359 // sort pipelines by date
360 slices.SortFunc(all, func(a, b models.Pipeline) int {
361 if a.Created.After(b.Created) {
362 return -1
363 }
364 return 1
365 })
366
367 return all, nil
368}