forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db
2
3import (
4 "fmt"
5 "slices"
6 "strings"
7 "time"
8
9 "tangled.org/core/appview/models"
10)
11
12func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) {
13 var pipelines []models.Pipeline
14
15 var conditions []string
16 var args []any
17 for _, filter := range filters {
18 conditions = append(conditions, filter.Condition())
19 args = append(args, filter.Arg()...)
20 }
21
22 whereClause := ""
23 if conditions != nil {
24 whereClause = " where " + strings.Join(conditions, " and ")
25 }
26
27 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
28
29 rows, err := e.Query(query, args...)
30
31 if err != nil {
32 return nil, err
33 }
34 defer rows.Close()
35
36 for rows.Next() {
37 var pipeline models.Pipeline
38 var createdAt string
39 err = rows.Scan(
40 &pipeline.Id,
41 &pipeline.Rkey,
42 &pipeline.Knot,
43 &pipeline.RepoOwner,
44 &pipeline.RepoName,
45 &pipeline.Sha,
46 &createdAt,
47 )
48 if err != nil {
49 return nil, err
50 }
51
52 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
53 pipeline.Created = t
54 }
55
56 pipelines = append(pipelines, pipeline)
57 }
58
59 if err = rows.Err(); err != nil {
60 return nil, err
61 }
62
63 return pipelines, nil
64}
65
66func AddPipeline(e Execer, pipeline models.Pipeline) error {
67 args := []any{
68 pipeline.Rkey,
69 pipeline.Knot,
70 pipeline.RepoOwner,
71 pipeline.RepoName,
72 pipeline.TriggerId,
73 pipeline.Sha,
74 }
75
76 placeholders := make([]string, len(args))
77 for i := range placeholders {
78 placeholders[i] = "?"
79 }
80
81 query := fmt.Sprintf(`
82 insert or ignore into pipelines (
83 rkey,
84 knot,
85 repo_owner,
86 repo_name,
87 trigger_id,
88 sha
89 ) values (%s)
90 `, strings.Join(placeholders, ","))
91
92 _, err := e.Exec(query, args...)
93
94 return err
95}
96
97func AddTrigger(e Execer, trigger models.Trigger) (int64, error) {
98 args := []any{
99 trigger.Kind,
100 trigger.PushRef,
101 trigger.PushNewSha,
102 trigger.PushOldSha,
103 trigger.PRSourceBranch,
104 trigger.PRTargetBranch,
105 trigger.PRSourceSha,
106 trigger.PRAction,
107 }
108
109 placeholders := make([]string, len(args))
110 for i := range placeholders {
111 placeholders[i] = "?"
112 }
113
114 query := fmt.Sprintf(`insert or ignore into triggers (
115 kind,
116 push_ref,
117 push_new_sha,
118 push_old_sha,
119 pr_source_branch,
120 pr_target_branch,
121 pr_source_sha,
122 pr_action
123 ) values (%s)`, strings.Join(placeholders, ","))
124
125 res, err := e.Exec(query, args...)
126 if err != nil {
127 return 0, err
128 }
129
130 return res.LastInsertId()
131}
132
133func AddPipelineStatus(e Execer, status models.PipelineStatus) error {
134 args := []any{
135 status.Spindle,
136 status.Rkey,
137 status.PipelineKnot,
138 status.PipelineRkey,
139 status.Workflow,
140 status.Status,
141 status.Error,
142 status.ExitCode,
143 status.Created.Format(time.RFC3339),
144 }
145
146 placeholders := make([]string, len(args))
147 for i := range placeholders {
148 placeholders[i] = "?"
149 }
150
151 query := fmt.Sprintf(`
152 insert or ignore into pipeline_statuses (
153 spindle,
154 rkey,
155 pipeline_knot,
156 pipeline_rkey,
157 workflow,
158 status,
159 error,
160 exit_code,
161 created
162 ) values (%s)
163 `, strings.Join(placeholders, ","))
164
165 _, err := e.Exec(query, args...)
166 return err
167}
168
169// this is a mega query, but the most useful one:
170// get N pipelines, for each one get the latest status of its N workflows
171func GetPipelineStatuses(e Execer, limit int, filters ...filter) ([]models.Pipeline, error) {
172 var conditions []string
173 var args []any
174 for _, filter := range filters {
175 filter.key = "p." + filter.key // the table is aliased in the query to `p`
176 conditions = append(conditions, filter.Condition())
177 args = append(args, filter.Arg()...)
178 }
179
180 whereClause := ""
181 if conditions != nil {
182 whereClause = " where " + strings.Join(conditions, " and ")
183 }
184
185 query := fmt.Sprintf(`
186 select
187 p.id,
188 p.knot,
189 p.rkey,
190 p.repo_owner,
191 p.repo_name,
192 p.sha,
193 p.created,
194 t.id,
195 t.kind,
196 t.push_ref,
197 t.push_new_sha,
198 t.push_old_sha,
199 t.pr_source_branch,
200 t.pr_target_branch,
201 t.pr_source_sha,
202 t.pr_action
203 from
204 pipelines p
205 join
206 triggers t ON p.trigger_id = t.id
207 %s
208 order by p.created desc
209 limit %d
210 `, whereClause, limit)
211
212 rows, err := e.Query(query, args...)
213 if err != nil {
214 return nil, err
215 }
216 defer rows.Close()
217
218 pipelines := make(map[string]models.Pipeline)
219 for rows.Next() {
220 var p models.Pipeline
221 var t models.Trigger
222 var created string
223
224 err := rows.Scan(
225 &p.Id,
226 &p.Knot,
227 &p.Rkey,
228 &p.RepoOwner,
229 &p.RepoName,
230 &p.Sha,
231 &created,
232 &p.TriggerId,
233 &t.Kind,
234 &t.PushRef,
235 &t.PushNewSha,
236 &t.PushOldSha,
237 &t.PRSourceBranch,
238 &t.PRTargetBranch,
239 &t.PRSourceSha,
240 &t.PRAction,
241 )
242 if err != nil {
243 return nil, err
244 }
245
246 p.Created, err = time.Parse(time.RFC3339, created)
247 if err != nil {
248 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
249 }
250
251 t.Id = p.TriggerId
252 p.Trigger = &t
253 p.Statuses = make(map[string]models.WorkflowStatus)
254
255 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
256 pipelines[k] = p
257 }
258
259 // get all statuses
260 // the where clause here is of the form:
261 //
262 // where (pipeline_knot = k1 and pipeline_rkey = r1)
263 // or (pipeline_knot = k2 and pipeline_rkey = r2)
264 conditions = nil
265 args = nil
266 for _, p := range pipelines {
267 knotFilter := FilterEq("pipeline_knot", p.Knot)
268 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
269 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
270 args = append(args, p.Knot)
271 args = append(args, p.Rkey)
272 }
273 whereClause = ""
274 if conditions != nil {
275 whereClause = "where " + strings.Join(conditions, " or ")
276 }
277 query = fmt.Sprintf(`
278 select
279 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
280 from
281 pipeline_statuses
282 %s
283 `, whereClause)
284
285 rows, err = e.Query(query, args...)
286 if err != nil {
287 return nil, err
288 }
289 defer rows.Close()
290
291 for rows.Next() {
292 var ps models.PipelineStatus
293 var created string
294
295 err := rows.Scan(
296 &ps.ID,
297 &ps.Spindle,
298 &ps.Rkey,
299 &ps.PipelineKnot,
300 &ps.PipelineRkey,
301 &created,
302 &ps.Workflow,
303 &ps.Status,
304 &ps.Error,
305 &ps.ExitCode,
306 )
307 if err != nil {
308 return nil, err
309 }
310
311 ps.Created, err = time.Parse(time.RFC3339, created)
312 if err != nil {
313 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
314 }
315
316 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
317
318 // extract
319 pipeline, ok := pipelines[key]
320 if !ok {
321 continue
322 }
323 statuses, _ := pipeline.Statuses[ps.Workflow]
324 if !ok {
325 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{}
326 }
327
328 // append
329 statuses.Data = append(statuses.Data, ps)
330
331 // reassign
332 pipeline.Statuses[ps.Workflow] = statuses
333 pipelines[key] = pipeline
334 }
335
336 var all []models.Pipeline
337 for _, p := range pipelines {
338 for _, s := range p.Statuses {
339 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int {
340 if a.Created.After(b.Created) {
341 return 1
342 }
343 if a.Created.Before(b.Created) {
344 return -1
345 }
346 if a.ID > b.ID {
347 return 1
348 }
349 if a.ID < b.ID {
350 return -1
351 }
352 return 0
353 })
354 }
355 all = append(all, p)
356 }
357
358 // sort pipelines by date
359 slices.SortFunc(all, func(a, b models.Pipeline) int {
360 if a.Created.After(b.Created) {
361 return -1
362 }
363 return 1
364 })
365
366 return all, nil
367}