forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 7.3 kB view raw
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "tangled.org/core/appview/models" 10 "tangled.org/core/orm" 11) 12 13func GetPipelines(e Execer, filters ...orm.Filter) ([]models.Pipeline, error) { 14 var pipelines []models.Pipeline 15 16 var conditions []string 17 var args []any 18 for _, filter := range filters { 19 conditions = append(conditions, filter.Condition()) 20 args = append(args, filter.Arg()...) 21 } 22 23 whereClause := "" 24 if conditions != nil { 25 whereClause = " where " + strings.Join(conditions, " and ") 26 } 27 28 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 29 30 rows, err := e.Query(query, args...) 31 32 if err != nil { 33 return nil, err 34 } 35 defer rows.Close() 36 37 for rows.Next() { 38 var pipeline models.Pipeline 39 var createdAt string 40 err = rows.Scan( 41 &pipeline.Id, 42 &pipeline.Rkey, 43 &pipeline.Knot, 44 &pipeline.RepoOwner, 45 &pipeline.RepoName, 46 &pipeline.Sha, 47 &createdAt, 48 ) 49 if err != nil { 50 return nil, err 51 } 52 53 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 54 pipeline.Created = t 55 } 56 57 pipelines = append(pipelines, pipeline) 58 } 59 60 if err = rows.Err(); err != nil { 61 return nil, err 62 } 63 64 return pipelines, nil 65} 66 67func AddPipeline(e Execer, pipeline models.Pipeline) error { 68 args := []any{ 69 pipeline.Rkey, 70 pipeline.Knot, 71 pipeline.RepoOwner, 72 pipeline.RepoName, 73 pipeline.TriggerId, 74 pipeline.Sha, 75 } 76 77 placeholders := make([]string, len(args)) 78 for i := range placeholders { 79 placeholders[i] = "?" 80 } 81 82 query := fmt.Sprintf(` 83 insert or ignore into pipelines ( 84 rkey, 85 knot, 86 repo_owner, 87 repo_name, 88 trigger_id, 89 sha 90 ) values (%s) 91 `, strings.Join(placeholders, ",")) 92 93 _, err := e.Exec(query, args...) 94 95 return err 96} 97 98func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 99 args := []any{ 100 trigger.Kind, 101 trigger.PushRef, 102 trigger.PushNewSha, 103 trigger.PushOldSha, 104 trigger.PRSourceBranch, 105 trigger.PRTargetBranch, 106 trigger.PRSourceSha, 107 trigger.PRAction, 108 } 109 110 placeholders := make([]string, len(args)) 111 for i := range placeholders { 112 placeholders[i] = "?" 113 } 114 115 query := fmt.Sprintf(`insert or ignore into triggers ( 116 kind, 117 push_ref, 118 push_new_sha, 119 push_old_sha, 120 pr_source_branch, 121 pr_target_branch, 122 pr_source_sha, 123 pr_action 124 ) values (%s)`, strings.Join(placeholders, ",")) 125 126 res, err := e.Exec(query, args...) 127 if err != nil { 128 return 0, err 129 } 130 131 return res.LastInsertId() 132} 133 134func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 135 args := []any{ 136 status.Spindle, 137 status.Rkey, 138 status.PipelineKnot, 139 status.PipelineRkey, 140 status.Workflow, 141 status.Status, 142 status.Error, 143 status.ExitCode, 144 status.Created.Format(time.RFC3339), 145 } 146 147 placeholders := make([]string, len(args)) 148 for i := range placeholders { 149 placeholders[i] = "?" 150 } 151 152 query := fmt.Sprintf(` 153 insert or ignore into pipeline_statuses ( 154 spindle, 155 rkey, 156 pipeline_knot, 157 pipeline_rkey, 158 workflow, 159 status, 160 error, 161 exit_code, 162 created 163 ) values (%s) 164 `, strings.Join(placeholders, ",")) 165 166 _, err := e.Exec(query, args...) 167 return err 168} 169 170// this is a mega query, but the most useful one: 171// get N pipelines, for each one get the latest status of its N workflows 172func GetPipelineStatuses(e Execer, limit int, filters ...orm.Filter) ([]models.Pipeline, error) { 173 var conditions []string 174 var args []any 175 for _, filter := range filters { 176 filter.Key = "p." + filter.Key // the table is aliased in the query to `p` 177 conditions = append(conditions, filter.Condition()) 178 args = append(args, filter.Arg()...) 179 } 180 181 whereClause := "" 182 if conditions != nil { 183 whereClause = " where " + strings.Join(conditions, " and ") 184 } 185 186 query := fmt.Sprintf(` 187 select 188 p.id, 189 p.knot, 190 p.rkey, 191 p.repo_owner, 192 p.repo_name, 193 p.sha, 194 p.created, 195 t.id, 196 t.kind, 197 t.push_ref, 198 t.push_new_sha, 199 t.push_old_sha, 200 t.pr_source_branch, 201 t.pr_target_branch, 202 t.pr_source_sha, 203 t.pr_action 204 from 205 pipelines p 206 join 207 triggers t ON p.trigger_id = t.id 208 %s 209 order by p.created desc 210 limit %d 211 `, whereClause, limit) 212 213 rows, err := e.Query(query, args...) 214 if err != nil { 215 return nil, err 216 } 217 defer rows.Close() 218 219 pipelines := make(map[string]models.Pipeline) 220 for rows.Next() { 221 var p models.Pipeline 222 var t models.Trigger 223 var created string 224 225 err := rows.Scan( 226 &p.Id, 227 &p.Knot, 228 &p.Rkey, 229 &p.RepoOwner, 230 &p.RepoName, 231 &p.Sha, 232 &created, 233 &p.TriggerId, 234 &t.Kind, 235 &t.PushRef, 236 &t.PushNewSha, 237 &t.PushOldSha, 238 &t.PRSourceBranch, 239 &t.PRTargetBranch, 240 &t.PRSourceSha, 241 &t.PRAction, 242 ) 243 if err != nil { 244 return nil, err 245 } 246 247 p.Created, err = time.Parse(time.RFC3339, created) 248 if err != nil { 249 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 250 } 251 252 t.Id = p.TriggerId 253 p.Trigger = &t 254 p.Statuses = make(map[string]models.WorkflowStatus) 255 256 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 257 pipelines[k] = p 258 } 259 260 // get all statuses 261 // the where clause here is of the form: 262 // 263 // where (pipeline_knot = k1 and pipeline_rkey = r1) 264 // or (pipeline_knot = k2 and pipeline_rkey = r2) 265 conditions = nil 266 args = nil 267 for _, p := range pipelines { 268 knotFilter := orm.FilterEq("pipeline_knot", p.Knot) 269 rkeyFilter := orm.FilterEq("pipeline_rkey", p.Rkey) 270 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 271 args = append(args, p.Knot) 272 args = append(args, p.Rkey) 273 } 274 whereClause = "" 275 if conditions != nil { 276 whereClause = "where " + strings.Join(conditions, " or ") 277 } 278 query = fmt.Sprintf(` 279 select 280 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 281 from 282 pipeline_statuses 283 %s 284 `, whereClause) 285 286 rows, err = e.Query(query, args...) 287 if err != nil { 288 return nil, err 289 } 290 defer rows.Close() 291 292 for rows.Next() { 293 var ps models.PipelineStatus 294 var created string 295 296 err := rows.Scan( 297 &ps.ID, 298 &ps.Spindle, 299 &ps.Rkey, 300 &ps.PipelineKnot, 301 &ps.PipelineRkey, 302 &created, 303 &ps.Workflow, 304 &ps.Status, 305 &ps.Error, 306 &ps.ExitCode, 307 ) 308 if err != nil { 309 return nil, err 310 } 311 312 ps.Created, err = time.Parse(time.RFC3339, created) 313 if err != nil { 314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 315 } 316 317 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 318 319 // extract 320 pipeline, ok := pipelines[key] 321 if !ok { 322 continue 323 } 324 statuses, _ := pipeline.Statuses[ps.Workflow] 325 if !ok { 326 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 327 } 328 329 // append 330 statuses.Data = append(statuses.Data, ps) 331 332 // reassign 333 pipeline.Statuses[ps.Workflow] = statuses 334 pipelines[key] = pipeline 335 } 336 337 var all []models.Pipeline 338 for _, p := range pipelines { 339 for _, s := range p.Statuses { 340 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 341 if a.Created.After(b.Created) { 342 return 1 343 } 344 if a.Created.Before(b.Created) { 345 return -1 346 } 347 if a.ID > b.ID { 348 return 1 349 } 350 if a.ID < b.ID { 351 return -1 352 } 353 return 0 354 }) 355 } 356 all = append(all, p) 357 } 358 359 // sort pipelines by date 360 slices.SortFunc(all, func(a, b models.Pipeline) int { 361 if a.Created.After(b.Created) { 362 return -1 363 } 364 return 1 365 }) 366 367 return all, nil 368}