forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at test-ci 9.4 kB view raw
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/go-git/go-git/v5/plumbing" 11 spindle "tangled.sh/tangled.sh/core/spindle/models" 12) 13 14type Pipeline struct { 15 Id int 16 Rkey string 17 Knot string 18 RepoOwner syntax.DID 19 RepoName string 20 TriggerId int 21 Sha string 22 Created time.Time 23 24 // populate when querying for reverse mappings 25 Trigger *Trigger 26 Statuses map[string]WorkflowStatus 27} 28 29type WorkflowStatus struct { 30 Data []PipelineStatus 31} 32 33func (w WorkflowStatus) Latest() PipelineStatus { 34 return w.Data[len(w.Data)-1] 35} 36 37// time taken by this workflow to reach an "end state" 38func (w WorkflowStatus) TimeTaken() time.Duration { 39 var start, end *time.Time 40 for _, s := range w.Data { 41 if s.Status.IsStart() { 42 start = &s.Created 43 } 44 if s.Status.IsFinish() { 45 end = &s.Created 46 } 47 } 48 49 if start != nil && end != nil && end.After(*start) { 50 return end.Sub(*start) 51 } 52 53 return 0 54} 55 56func (p Pipeline) Counts() map[string]int { 57 m := make(map[string]int) 58 for _, w := range p.Statuses { 59 m[w.Latest().Status.String()] += 1 60 } 61 return m 62} 63 64func (p Pipeline) TimeTaken() time.Duration { 65 var s time.Duration 66 for _, w := range p.Statuses { 67 s += w.TimeTaken() 68 } 69 return s 70} 71 72func (p Pipeline) Workflows() []string { 73 var ws []string 74 for v := range p.Statuses { 75 ws = append(ws, v) 76 } 77 slices.Sort(ws) 78 return ws 79} 80 81// if we know that a spindle has picked up this pipeline, then it is Responding 82func (p Pipeline) IsResponding() bool { 83 return len(p.Statuses) != 0 84} 85 86type Trigger struct { 87 Id int 88 Kind string 89 90 // push trigger fields 91 PushRef *string 92 PushNewSha *string 93 PushOldSha *string 94 95 // pull request trigger fields 96 PRSourceBranch *string 97 PRTargetBranch *string 98 PRSourceSha *string 99 PRAction *string 100} 101 102func (t *Trigger) IsPush() bool { 103 return t != nil && t.Kind == "push" 104} 105 106func (t *Trigger) IsPullRequest() bool { 107 return t != nil && t.Kind == "pull_request" 108} 109 110func (t *Trigger) TargetRef() string { 111 if t.IsPush() { 112 return plumbing.ReferenceName(*t.PushRef).Short() 113 } else if t.IsPullRequest() { 114 return *t.PRTargetBranch 115 } 116 117 return "" 118} 119 120type PipelineStatus struct { 121 ID int 122 Spindle string 123 Rkey string 124 PipelineKnot string 125 PipelineRkey string 126 Created time.Time 127 Workflow string 128 Status spindle.StatusKind 129 Error *string 130 ExitCode int 131} 132 133func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { 134 var pipelines []Pipeline 135 136 var conditions []string 137 var args []any 138 for _, filter := range filters { 139 conditions = append(conditions, filter.Condition()) 140 args = append(args, filter.Arg()...) 141 } 142 143 whereClause := "" 144 if conditions != nil { 145 whereClause = " where " + strings.Join(conditions, " and ") 146 } 147 148 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 149 150 rows, err := e.Query(query, args...) 151 152 if err != nil { 153 return nil, err 154 } 155 defer rows.Close() 156 157 for rows.Next() { 158 var pipeline Pipeline 159 var createdAt string 160 err = rows.Scan( 161 &pipeline.Id, 162 &pipeline.Rkey, 163 &pipeline.Knot, 164 &pipeline.RepoOwner, 165 &pipeline.RepoName, 166 &pipeline.Sha, 167 &createdAt, 168 ) 169 if err != nil { 170 return nil, err 171 } 172 173 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 174 pipeline.Created = t 175 } 176 177 pipelines = append(pipelines, pipeline) 178 } 179 180 if err = rows.Err(); err != nil { 181 return nil, err 182 } 183 184 return pipelines, nil 185} 186 187func AddPipeline(e Execer, pipeline Pipeline) error { 188 args := []any{ 189 pipeline.Rkey, 190 pipeline.Knot, 191 pipeline.RepoOwner, 192 pipeline.RepoName, 193 pipeline.TriggerId, 194 pipeline.Sha, 195 } 196 197 placeholders := make([]string, len(args)) 198 for i := range placeholders { 199 placeholders[i] = "?" 200 } 201 202 query := fmt.Sprintf(` 203 insert or ignore into pipelines ( 204 rkey, 205 knot, 206 repo_owner, 207 repo_name, 208 trigger_id, 209 sha 210 ) values (%s) 211 `, strings.Join(placeholders, ",")) 212 213 _, err := e.Exec(query, args...) 214 215 return err 216} 217 218func AddTrigger(e Execer, trigger Trigger) (int64, error) { 219 args := []any{ 220 trigger.Kind, 221 trigger.PushRef, 222 trigger.PushNewSha, 223 trigger.PushOldSha, 224 trigger.PRSourceBranch, 225 trigger.PRTargetBranch, 226 trigger.PRSourceSha, 227 trigger.PRAction, 228 } 229 230 placeholders := make([]string, len(args)) 231 for i := range placeholders { 232 placeholders[i] = "?" 233 } 234 235 query := fmt.Sprintf(`insert or ignore into triggers ( 236 kind, 237 push_ref, 238 push_new_sha, 239 push_old_sha, 240 pr_source_branch, 241 pr_target_branch, 242 pr_source_sha, 243 pr_action 244 ) values (%s)`, strings.Join(placeholders, ",")) 245 246 res, err := e.Exec(query, args...) 247 if err != nil { 248 return 0, err 249 } 250 251 return res.LastInsertId() 252} 253 254func AddPipelineStatus(e Execer, status PipelineStatus) error { 255 args := []any{ 256 status.Spindle, 257 status.Rkey, 258 status.PipelineKnot, 259 status.PipelineRkey, 260 status.Workflow, 261 status.Status, 262 status.Error, 263 status.ExitCode, 264 status.Created.Format(time.RFC3339), 265 } 266 267 placeholders := make([]string, len(args)) 268 for i := range placeholders { 269 placeholders[i] = "?" 270 } 271 272 query := fmt.Sprintf(` 273 insert or ignore into pipeline_statuses ( 274 spindle, 275 rkey, 276 pipeline_knot, 277 pipeline_rkey, 278 workflow, 279 status, 280 error, 281 exit_code, 282 created 283 ) values (%s) 284 `, strings.Join(placeholders, ",")) 285 286 _, err := e.Exec(query, args...) 287 return err 288} 289 290// this is a mega query, but the most useful one: 291// get N pipelines, for each one get the latest status of its N workflows 292func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { 293 var conditions []string 294 var args []any 295 for _, filter := range filters { 296 filter.key = "p." + filter.key // the table is aliased in the query to `p` 297 conditions = append(conditions, filter.Condition()) 298 args = append(args, filter.Arg()...) 299 } 300 301 whereClause := "" 302 if conditions != nil { 303 whereClause = " where " + strings.Join(conditions, " and ") 304 } 305 306 query := fmt.Sprintf(` 307 select 308 p.id, 309 p.knot, 310 p.rkey, 311 p.repo_owner, 312 p.repo_name, 313 p.sha, 314 p.created, 315 t.id, 316 t.kind, 317 t.push_ref, 318 t.push_new_sha, 319 t.push_old_sha, 320 t.pr_source_branch, 321 t.pr_target_branch, 322 t.pr_source_sha, 323 t.pr_action 324 from 325 pipelines p 326 join 327 triggers t ON p.trigger_id = t.id 328 %s 329 `, whereClause) 330 331 rows, err := e.Query(query, args...) 332 if err != nil { 333 return nil, err 334 } 335 defer rows.Close() 336 337 pipelines := make(map[string]Pipeline) 338 for rows.Next() { 339 var p Pipeline 340 var t Trigger 341 var created string 342 343 err := rows.Scan( 344 &p.Id, 345 &p.Knot, 346 &p.Rkey, 347 &p.RepoOwner, 348 &p.RepoName, 349 &p.Sha, 350 &created, 351 &p.TriggerId, 352 &t.Kind, 353 &t.PushRef, 354 &t.PushNewSha, 355 &t.PushOldSha, 356 &t.PRSourceBranch, 357 &t.PRTargetBranch, 358 &t.PRSourceSha, 359 &t.PRAction, 360 ) 361 if err != nil { 362 return nil, err 363 } 364 365 p.Created, err = time.Parse(time.RFC3339, created) 366 if err != nil { 367 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 368 } 369 370 t.Id = p.TriggerId 371 p.Trigger = &t 372 p.Statuses = make(map[string]WorkflowStatus) 373 374 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 375 pipelines[k] = p 376 } 377 378 // get all statuses 379 // the where clause here is of the form: 380 // 381 // where (pipeline_knot = k1 and pipeline_rkey = r1) 382 // or (pipeline_knot = k2 and pipeline_rkey = r2) 383 conditions = nil 384 args = nil 385 for _, p := range pipelines { 386 knotFilter := FilterEq("pipeline_knot", p.Knot) 387 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 388 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 389 args = append(args, p.Knot) 390 args = append(args, p.Rkey) 391 } 392 whereClause = "" 393 if conditions != nil { 394 whereClause = "where " + strings.Join(conditions, " or ") 395 } 396 query = fmt.Sprintf(` 397 select 398 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 399 from 400 pipeline_statuses 401 %s 402 `, whereClause) 403 404 rows, err = e.Query(query, args...) 405 if err != nil { 406 return nil, err 407 } 408 defer rows.Close() 409 410 for rows.Next() { 411 var ps PipelineStatus 412 var created string 413 414 err := rows.Scan( 415 &ps.ID, 416 &ps.Spindle, 417 &ps.Rkey, 418 &ps.PipelineKnot, 419 &ps.PipelineRkey, 420 &created, 421 &ps.Workflow, 422 &ps.Status, 423 &ps.Error, 424 &ps.ExitCode, 425 ) 426 if err != nil { 427 return nil, err 428 } 429 430 ps.Created, err = time.Parse(time.RFC3339, created) 431 if err != nil { 432 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 433 } 434 435 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 436 437 // extract 438 pipeline, ok := pipelines[key] 439 if !ok { 440 continue 441 } 442 statuses, _ := pipeline.Statuses[ps.Workflow] 443 if !ok { 444 pipeline.Statuses[ps.Workflow] = WorkflowStatus{} 445 } 446 447 // append 448 statuses.Data = append(statuses.Data, ps) 449 450 // reassign 451 pipeline.Statuses[ps.Workflow] = statuses 452 pipelines[key] = pipeline 453 } 454 455 var all []Pipeline 456 for _, p := range pipelines { 457 for _, s := range p.Statuses { 458 slices.SortFunc(s.Data, func(a, b PipelineStatus) int { 459 if a.Created.After(b.Created) { 460 return 1 461 } 462 if a.Created.Before(b.Created) { 463 return -1 464 } 465 if a.ID > b.ID { 466 return 1 467 } 468 if a.ID < b.ID { 469 return -1 470 } 471 return 0 472 }) 473 } 474 all = append(all, p) 475 } 476 477 // sort pipelines by date 478 slices.SortFunc(all, func(a, b Pipeline) int { 479 if a.Created.After(b.Created) { 480 return -1 481 } 482 return 1 483 }) 484 485 return all, nil 486}