forked from tangled.org/core
this repo has no description
at master 7.2 kB view raw
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "tangled.org/core/appview/models" 10) 11 12func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) { 13 var pipelines []models.Pipeline 14 15 var conditions []string 16 var args []any 17 for _, filter := range filters { 18 conditions = append(conditions, filter.Condition()) 19 args = append(args, filter.Arg()...) 20 } 21 22 whereClause := "" 23 if conditions != nil { 24 whereClause = " where " + strings.Join(conditions, " and ") 25 } 26 27 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 28 29 rows, err := e.Query(query, args...) 30 31 if err != nil { 32 return nil, err 33 } 34 defer rows.Close() 35 36 for rows.Next() { 37 var pipeline models.Pipeline 38 var createdAt string 39 err = rows.Scan( 40 &pipeline.Id, 41 &pipeline.Rkey, 42 &pipeline.Knot, 43 &pipeline.RepoOwner, 44 &pipeline.RepoName, 45 &pipeline.Sha, 46 &createdAt, 47 ) 48 if err != nil { 49 return nil, err 50 } 51 52 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 53 pipeline.Created = t 54 } 55 56 pipelines = append(pipelines, pipeline) 57 } 58 59 if err = rows.Err(); err != nil { 60 return nil, err 61 } 62 63 return pipelines, nil 64} 65 66func AddPipeline(e Execer, pipeline models.Pipeline) error { 67 args := []any{ 68 pipeline.Rkey, 69 pipeline.Knot, 70 pipeline.RepoOwner, 71 pipeline.RepoName, 72 pipeline.TriggerId, 73 pipeline.Sha, 74 } 75 76 placeholders := make([]string, len(args)) 77 for i := range placeholders { 78 placeholders[i] = "?" 79 } 80 81 query := fmt.Sprintf(` 82 insert or ignore into pipelines ( 83 rkey, 84 knot, 85 repo_owner, 86 repo_name, 87 trigger_id, 88 sha 89 ) values (%s) 90 `, strings.Join(placeholders, ",")) 91 92 _, err := e.Exec(query, args...) 93 94 return err 95} 96 97func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 98 args := []any{ 99 trigger.Kind, 100 trigger.PushRef, 101 trigger.PushNewSha, 102 trigger.PushOldSha, 103 trigger.PRSourceBranch, 104 trigger.PRTargetBranch, 105 trigger.PRSourceSha, 106 trigger.PRAction, 107 } 108 109 placeholders := make([]string, len(args)) 110 for i := range placeholders { 111 placeholders[i] = "?" 112 } 113 114 query := fmt.Sprintf(`insert or ignore into triggers ( 115 kind, 116 push_ref, 117 push_new_sha, 118 push_old_sha, 119 pr_source_branch, 120 pr_target_branch, 121 pr_source_sha, 122 pr_action 123 ) values (%s)`, strings.Join(placeholders, ",")) 124 125 res, err := e.Exec(query, args...) 126 if err != nil { 127 return 0, err 128 } 129 130 return res.LastInsertId() 131} 132 133func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 134 args := []any{ 135 status.Spindle, 136 status.Rkey, 137 status.PipelineKnot, 138 status.PipelineRkey, 139 status.Workflow, 140 status.Status, 141 status.Error, 142 status.ExitCode, 143 status.Created.Format(time.RFC3339), 144 } 145 146 placeholders := make([]string, len(args)) 147 for i := range placeholders { 148 placeholders[i] = "?" 149 } 150 151 query := fmt.Sprintf(` 152 insert or ignore into pipeline_statuses ( 153 spindle, 154 rkey, 155 pipeline_knot, 156 pipeline_rkey, 157 workflow, 158 status, 159 error, 160 exit_code, 161 created 162 ) values (%s) 163 `, strings.Join(placeholders, ",")) 164 165 _, err := e.Exec(query, args...) 166 return err 167} 168 169// this is a mega query, but the most useful one: 170// get N pipelines, for each one get the latest status of its N workflows 171func GetPipelineStatuses(e Execer, filters ...filter) ([]models.Pipeline, error) { 172 var conditions []string 173 var args []any 174 for _, filter := range filters { 175 filter.key = "p." + filter.key // the table is aliased in the query to `p` 176 conditions = append(conditions, filter.Condition()) 177 args = append(args, filter.Arg()...) 178 } 179 180 whereClause := "" 181 if conditions != nil { 182 whereClause = " where " + strings.Join(conditions, " and ") 183 } 184 185 query := fmt.Sprintf(` 186 select 187 p.id, 188 p.knot, 189 p.rkey, 190 p.repo_owner, 191 p.repo_name, 192 p.sha, 193 p.created, 194 t.id, 195 t.kind, 196 t.push_ref, 197 t.push_new_sha, 198 t.push_old_sha, 199 t.pr_source_branch, 200 t.pr_target_branch, 201 t.pr_source_sha, 202 t.pr_action 203 from 204 pipelines p 205 join 206 triggers t ON p.trigger_id = t.id 207 %s 208 `, whereClause) 209 210 rows, err := e.Query(query, args...) 211 if err != nil { 212 return nil, err 213 } 214 defer rows.Close() 215 216 pipelines := make(map[string]models.Pipeline) 217 for rows.Next() { 218 var p models.Pipeline 219 var t models.Trigger 220 var created string 221 222 err := rows.Scan( 223 &p.Id, 224 &p.Knot, 225 &p.Rkey, 226 &p.RepoOwner, 227 &p.RepoName, 228 &p.Sha, 229 &created, 230 &p.TriggerId, 231 &t.Kind, 232 &t.PushRef, 233 &t.PushNewSha, 234 &t.PushOldSha, 235 &t.PRSourceBranch, 236 &t.PRTargetBranch, 237 &t.PRSourceSha, 238 &t.PRAction, 239 ) 240 if err != nil { 241 return nil, err 242 } 243 244 p.Created, err = time.Parse(time.RFC3339, created) 245 if err != nil { 246 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 247 } 248 249 t.Id = p.TriggerId 250 p.Trigger = &t 251 p.Statuses = make(map[string]models.WorkflowStatus) 252 253 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 254 pipelines[k] = p 255 } 256 257 // get all statuses 258 // the where clause here is of the form: 259 // 260 // where (pipeline_knot = k1 and pipeline_rkey = r1) 261 // or (pipeline_knot = k2 and pipeline_rkey = r2) 262 conditions = nil 263 args = nil 264 for _, p := range pipelines { 265 knotFilter := FilterEq("pipeline_knot", p.Knot) 266 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 267 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 268 args = append(args, p.Knot) 269 args = append(args, p.Rkey) 270 } 271 whereClause = "" 272 if conditions != nil { 273 whereClause = "where " + strings.Join(conditions, " or ") 274 } 275 query = fmt.Sprintf(` 276 select 277 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 278 from 279 pipeline_statuses 280 %s 281 `, whereClause) 282 283 rows, err = e.Query(query, args...) 284 if err != nil { 285 return nil, err 286 } 287 defer rows.Close() 288 289 for rows.Next() { 290 var ps models.PipelineStatus 291 var created string 292 293 err := rows.Scan( 294 &ps.ID, 295 &ps.Spindle, 296 &ps.Rkey, 297 &ps.PipelineKnot, 298 &ps.PipelineRkey, 299 &created, 300 &ps.Workflow, 301 &ps.Status, 302 &ps.Error, 303 &ps.ExitCode, 304 ) 305 if err != nil { 306 return nil, err 307 } 308 309 ps.Created, err = time.Parse(time.RFC3339, created) 310 if err != nil { 311 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 312 } 313 314 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 315 316 // extract 317 pipeline, ok := pipelines[key] 318 if !ok { 319 continue 320 } 321 statuses, _ := pipeline.Statuses[ps.Workflow] 322 if !ok { 323 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 324 } 325 326 // append 327 statuses.Data = append(statuses.Data, ps) 328 329 // reassign 330 pipeline.Statuses[ps.Workflow] = statuses 331 pipelines[key] = pipeline 332 } 333 334 var all []models.Pipeline 335 for _, p := range pipelines { 336 for _, s := range p.Statuses { 337 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 338 if a.Created.After(b.Created) { 339 return 1 340 } 341 if a.Created.Before(b.Created) { 342 return -1 343 } 344 if a.ID > b.ID { 345 return 1 346 } 347 if a.ID < b.ID { 348 return -1 349 } 350 return 0 351 }) 352 } 353 all = append(all, p) 354 } 355 356 // sort pipelines by date 357 slices.SortFunc(all, func(a, b models.Pipeline) int { 358 if a.Created.After(b.Created) { 359 return -1 360 } 361 return 1 362 }) 363 364 return all, nil 365}