forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "tangled.org/core/appview/models" 10) 11 12func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) { 13 var pipelines []models.Pipeline 14 15 var conditions []string 16 var args []any 17 for _, filter := range filters { 18 conditions = append(conditions, filter.Condition()) 19 args = append(args, filter.Arg()...) 20 } 21 22 whereClause := "" 23 if conditions != nil { 24 whereClause = " where " + strings.Join(conditions, " and ") 25 } 26 27 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 28 29 rows, err := e.Query(query, args...) 30 31 if err != nil { 32 return nil, err 33 } 34 defer rows.Close() 35 36 for rows.Next() { 37 var pipeline models.Pipeline 38 var createdAt string 39 err = rows.Scan( 40 &pipeline.Id, 41 &pipeline.Rkey, 42 &pipeline.Knot, 43 &pipeline.RepoOwner, 44 &pipeline.RepoName, 45 &pipeline.Sha, 46 &createdAt, 47 ) 48 if err != nil { 49 return nil, err 50 } 51 52 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 53 pipeline.Created = t 54 } 55 56 pipelines = append(pipelines, pipeline) 57 } 58 59 if err = rows.Err(); err != nil { 60 return nil, err 61 } 62 63 return pipelines, nil 64} 65 66func AddPipeline(e Execer, pipeline models.Pipeline) error { 67 args := []any{ 68 pipeline.Rkey, 69 pipeline.Knot, 70 pipeline.RepoOwner, 71 pipeline.RepoName, 72 pipeline.TriggerId, 73 pipeline.Sha, 74 } 75 76 placeholders := make([]string, len(args)) 77 for i := range placeholders { 78 placeholders[i] = "?" 79 } 80 81 query := fmt.Sprintf(` 82 insert or ignore into pipelines ( 83 rkey, 84 knot, 85 repo_owner, 86 repo_name, 87 trigger_id, 88 sha 89 ) values (%s) 90 `, strings.Join(placeholders, ",")) 91 92 _, err := e.Exec(query, args...) 93 94 return err 95} 96 97func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { 98 args := []any{ 99 trigger.Kind, 100 trigger.PushRef, 101 trigger.PushNewSha, 102 trigger.PushOldSha, 103 trigger.PRSourceBranch, 104 trigger.PRTargetBranch, 105 trigger.PRSourceSha, 106 trigger.PRAction, 107 } 108 109 placeholders := make([]string, len(args)) 110 for i := range placeholders { 111 placeholders[i] = "?" 112 } 113 114 query := fmt.Sprintf(`insert or ignore into triggers ( 115 kind, 116 push_ref, 117 push_new_sha, 118 push_old_sha, 119 pr_source_branch, 120 pr_target_branch, 121 pr_source_sha, 122 pr_action 123 ) values (%s)`, strings.Join(placeholders, ",")) 124 125 res, err := e.Exec(query, args...) 126 if err != nil { 127 return 0, err 128 } 129 130 return res.LastInsertId() 131} 132 133func AddPipelineStatus(e Execer, status models.PipelineStatus) error { 134 args := []any{ 135 status.Spindle, 136 status.Rkey, 137 status.PipelineKnot, 138 status.PipelineRkey, 139 status.Workflow, 140 status.Status, 141 status.Error, 142 status.ExitCode, 143 status.Created.Format(time.RFC3339), 144 } 145 146 placeholders := make([]string, len(args)) 147 for i := range placeholders { 148 placeholders[i] = "?" 149 } 150 151 query := fmt.Sprintf(` 152 insert or ignore into pipeline_statuses ( 153 spindle, 154 rkey, 155 pipeline_knot, 156 pipeline_rkey, 157 workflow, 158 status, 159 error, 160 exit_code, 161 created 162 ) values (%s) 163 `, strings.Join(placeholders, ",")) 164 165 _, err := e.Exec(query, args...) 166 return err 167} 168 169// this is a mega query, but the most useful one: 170// get N pipelines, for each one get the latest status of its N workflows 171func GetPipelineStatuses(e Execer, limit int, filters ...filter) ([]models.Pipeline, error) { 172 var conditions []string 173 var args []any 174 for _, filter := range filters { 175 filter.key = "p." + filter.key // the table is aliased in the query to `p` 176 conditions = append(conditions, filter.Condition()) 177 args = append(args, filter.Arg()...) 178 } 179 180 whereClause := "" 181 if conditions != nil { 182 whereClause = " where " + strings.Join(conditions, " and ") 183 } 184 185 query := fmt.Sprintf(` 186 select 187 p.id, 188 p.knot, 189 p.rkey, 190 p.repo_owner, 191 p.repo_name, 192 p.sha, 193 p.created, 194 t.id, 195 t.kind, 196 t.push_ref, 197 t.push_new_sha, 198 t.push_old_sha, 199 t.pr_source_branch, 200 t.pr_target_branch, 201 t.pr_source_sha, 202 t.pr_action 203 from 204 pipelines p 205 join 206 triggers t ON p.trigger_id = t.id 207 %s 208 order by p.created desc 209 limit %d 210 `, whereClause, limit) 211 212 rows, err := e.Query(query, args...) 213 if err != nil { 214 return nil, err 215 } 216 defer rows.Close() 217 218 pipelines := make(map[string]models.Pipeline) 219 for rows.Next() { 220 var p models.Pipeline 221 var t models.Trigger 222 var created string 223 224 err := rows.Scan( 225 &p.Id, 226 &p.Knot, 227 &p.Rkey, 228 &p.RepoOwner, 229 &p.RepoName, 230 &p.Sha, 231 &created, 232 &p.TriggerId, 233 &t.Kind, 234 &t.PushRef, 235 &t.PushNewSha, 236 &t.PushOldSha, 237 &t.PRSourceBranch, 238 &t.PRTargetBranch, 239 &t.PRSourceSha, 240 &t.PRAction, 241 ) 242 if err != nil { 243 return nil, err 244 } 245 246 p.Created, err = time.Parse(time.RFC3339, created) 247 if err != nil { 248 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 249 } 250 251 t.Id = p.TriggerId 252 p.Trigger = &t 253 p.Statuses = make(map[string]models.WorkflowStatus) 254 255 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 256 pipelines[k] = p 257 } 258 259 // get all statuses 260 // the where clause here is of the form: 261 // 262 // where (pipeline_knot = k1 and pipeline_rkey = r1) 263 // or (pipeline_knot = k2 and pipeline_rkey = r2) 264 conditions = nil 265 args = nil 266 for _, p := range pipelines { 267 knotFilter := FilterEq("pipeline_knot", p.Knot) 268 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 269 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 270 args = append(args, p.Knot) 271 args = append(args, p.Rkey) 272 } 273 whereClause = "" 274 if conditions != nil { 275 whereClause = "where " + strings.Join(conditions, " or ") 276 } 277 query = fmt.Sprintf(` 278 select 279 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 280 from 281 pipeline_statuses 282 %s 283 `, whereClause) 284 285 rows, err = e.Query(query, args...) 286 if err != nil { 287 return nil, err 288 } 289 defer rows.Close() 290 291 for rows.Next() { 292 var ps models.PipelineStatus 293 var created string 294 295 err := rows.Scan( 296 &ps.ID, 297 &ps.Spindle, 298 &ps.Rkey, 299 &ps.PipelineKnot, 300 &ps.PipelineRkey, 301 &created, 302 &ps.Workflow, 303 &ps.Status, 304 &ps.Error, 305 &ps.ExitCode, 306 ) 307 if err != nil { 308 return nil, err 309 } 310 311 ps.Created, err = time.Parse(time.RFC3339, created) 312 if err != nil { 313 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 314 } 315 316 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 317 318 // extract 319 pipeline, ok := pipelines[key] 320 if !ok { 321 continue 322 } 323 statuses, _ := pipeline.Statuses[ps.Workflow] 324 if !ok { 325 pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} 326 } 327 328 // append 329 statuses.Data = append(statuses.Data, ps) 330 331 // reassign 332 pipeline.Statuses[ps.Workflow] = statuses 333 pipelines[key] = pipeline 334 } 335 336 var all []models.Pipeline 337 for _, p := range pipelines { 338 for _, s := range p.Statuses { 339 slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { 340 if a.Created.After(b.Created) { 341 return 1 342 } 343 if a.Created.Before(b.Created) { 344 return -1 345 } 346 if a.ID > b.ID { 347 return 1 348 } 349 if a.ID < b.ID { 350 return -1 351 } 352 return 0 353 }) 354 } 355 all = append(all, p) 356 } 357 358 // sort pipelines by date 359 slices.SortFunc(all, func(a, b models.Pipeline) int { 360 if a.Created.After(b.Created) { 361 return -1 362 } 363 return 1 364 }) 365 366 return all, nil 367}