forked from tangled.org/core
this repo has no description
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/go-git/go-git/v5/plumbing" 11 spindle "tangled.sh/tangled.sh/core/spindle/models" 12) 13 14type Pipeline struct { 15 Id int 16 Rkey string 17 Knot string 18 RepoOwner syntax.DID 19 RepoName string 20 TriggerId int 21 Sha string 22 Created time.Time 23 24 // populate when querying for reverse mappings 25 Trigger *Trigger 26 Statuses map[string]WorkflowStatus 27} 28 29type WorkflowStatus struct { 30 data []PipelineStatus 31} 32 33func (w WorkflowStatus) Latest() PipelineStatus { 34 return w.data[len(w.data)-1] 35} 36 37// time taken by this workflow to reach an "end state" 38func (w WorkflowStatus) TimeTaken() time.Duration { 39 var start, end *time.Time 40 for _, s := range w.data { 41 if s.Status.IsStart() { 42 start = &s.Created 43 } 44 if s.Status.IsFinish() { 45 end = &s.Created 46 } 47 } 48 49 if start != nil && end != nil && end.After(*start) { 50 return end.Sub(*start) 51 } 52 53 return 0 54} 55 56func (p Pipeline) Counts() map[string]int { 57 m := make(map[string]int) 58 for _, w := range p.Statuses { 59 m[w.Latest().Status.String()] += 1 60 } 61 return m 62} 63 64func (p Pipeline) TimeTaken() time.Duration { 65 var s time.Duration 66 for _, w := range p.Statuses { 67 s += w.TimeTaken() 68 } 69 return s 70} 71 72func (p Pipeline) Workflows() []string { 73 var ws []string 74 for v := range p.Statuses { 75 ws = append(ws, v) 76 } 77 slices.Sort(ws) 78 return ws 79} 80 81type Trigger struct { 82 Id int 83 Kind string 84 85 // push trigger fields 86 PushRef *string 87 PushNewSha *string 88 PushOldSha *string 89 90 // pull request trigger fields 91 PRSourceBranch *string 92 PRTargetBranch *string 93 PRSourceSha *string 94 PRAction *string 95} 96 97func (t *Trigger) IsPush() bool { 98 return t != nil && t.Kind == "push" 99} 100 101func (t *Trigger) IsPullRequest() bool { 102 return t != nil && t.Kind == "pull_request" 103} 104 105func (t *Trigger) TargetRef() string { 106 if t.IsPush() { 107 return plumbing.ReferenceName(*t.PushRef).Short() 108 } else if t.IsPullRequest() { 109 return *t.PRTargetBranch 110 } 111 112 return "" 113} 114 115type PipelineStatus struct { 116 ID int 117 Spindle string 118 Rkey string 119 PipelineKnot string 120 PipelineRkey string 121 Created time.Time 122 Workflow string 123 Status spindle.StatusKind 124 Error *string 125 ExitCode int 126} 127 128func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { 129 var pipelines []Pipeline 130 131 var conditions []string 132 var args []any 133 for _, filter := range filters { 134 conditions = append(conditions, filter.Condition()) 135 args = append(args, filter.Arg()...) 136 } 137 138 whereClause := "" 139 if conditions != nil { 140 whereClause = " where " + strings.Join(conditions, " and ") 141 } 142 143 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 144 145 rows, err := e.Query(query, args...) 146 147 if err != nil { 148 return nil, err 149 } 150 defer rows.Close() 151 152 for rows.Next() { 153 var pipeline Pipeline 154 var createdAt string 155 err = rows.Scan( 156 &pipeline.Id, 157 &pipeline.Rkey, 158 &pipeline.Knot, 159 &pipeline.RepoOwner, 160 &pipeline.RepoName, 161 &pipeline.Sha, 162 &createdAt, 163 ) 164 if err != nil { 165 return nil, err 166 } 167 168 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 169 pipeline.Created = t 170 } 171 172 pipelines = append(pipelines, pipeline) 173 } 174 175 if err = rows.Err(); err != nil { 176 return nil, err 177 } 178 179 return pipelines, nil 180} 181 182func AddPipeline(e Execer, pipeline Pipeline) error { 183 args := []any{ 184 pipeline.Rkey, 185 pipeline.Knot, 186 pipeline.RepoOwner, 187 pipeline.RepoName, 188 pipeline.TriggerId, 189 pipeline.Sha, 190 } 191 192 placeholders := make([]string, len(args)) 193 for i := range placeholders { 194 placeholders[i] = "?" 195 } 196 197 query := fmt.Sprintf(` 198 insert or ignore into pipelines ( 199 rkey, 200 knot, 201 repo_owner, 202 repo_name, 203 trigger_id, 204 sha 205 ) values (%s) 206 `, strings.Join(placeholders, ",")) 207 208 _, err := e.Exec(query, args...) 209 210 return err 211} 212 213func AddTrigger(e Execer, trigger Trigger) (int64, error) { 214 args := []any{ 215 trigger.Kind, 216 trigger.PushRef, 217 trigger.PushNewSha, 218 trigger.PushOldSha, 219 trigger.PRSourceBranch, 220 trigger.PRTargetBranch, 221 trigger.PRSourceSha, 222 trigger.PRAction, 223 } 224 225 placeholders := make([]string, len(args)) 226 for i := range placeholders { 227 placeholders[i] = "?" 228 } 229 230 query := fmt.Sprintf(`insert or ignore into triggers ( 231 kind, 232 push_ref, 233 push_new_sha, 234 push_old_sha, 235 pr_source_branch, 236 pr_target_branch, 237 pr_source_sha, 238 pr_action 239 ) values (%s)`, strings.Join(placeholders, ",")) 240 241 res, err := e.Exec(query, args...) 242 if err != nil { 243 return 0, err 244 } 245 246 return res.LastInsertId() 247} 248 249func AddPipelineStatus(e Execer, status PipelineStatus) error { 250 args := []any{ 251 status.Spindle, 252 status.Rkey, 253 status.PipelineKnot, 254 status.PipelineRkey, 255 status.Workflow, 256 status.Status, 257 status.Error, 258 status.ExitCode, 259 } 260 261 placeholders := make([]string, len(args)) 262 for i := range placeholders { 263 placeholders[i] = "?" 264 } 265 266 query := fmt.Sprintf(` 267 insert or ignore into pipeline_statuses ( 268 spindle, 269 rkey, 270 pipeline_knot, 271 pipeline_rkey, 272 workflow, 273 status, 274 error, 275 exit_code 276 ) values (%s) 277 `, strings.Join(placeholders, ",")) 278 279 _, err := e.Exec(query, args...) 280 return err 281} 282 283// this is a mega query, but the most useful one: 284// get N pipelines, for each one get the latest status of its N workflows 285func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { 286 var conditions []string 287 var args []any 288 for _, filter := range filters { 289 filter.key = "p." + filter.key // the table is aliased in the query to `p` 290 conditions = append(conditions, filter.Condition()) 291 args = append(args, filter.Arg()...) 292 } 293 294 whereClause := "" 295 if conditions != nil { 296 whereClause = " where " + strings.Join(conditions, " and ") 297 } 298 299 query := fmt.Sprintf(` 300 select 301 p.id, 302 p.knot, 303 p.rkey, 304 p.repo_owner, 305 p.repo_name, 306 p.sha, 307 p.created, 308 t.id, 309 t.kind, 310 t.push_ref, 311 t.push_new_sha, 312 t.push_old_sha, 313 t.pr_source_branch, 314 t.pr_target_branch, 315 t.pr_source_sha, 316 t.pr_action 317 from 318 pipelines p 319 join 320 triggers t ON p.trigger_id = t.id 321 %s 322 `, whereClause) 323 324 rows, err := e.Query(query, args...) 325 if err != nil { 326 return nil, err 327 } 328 defer rows.Close() 329 330 pipelines := make(map[string]Pipeline) 331 for rows.Next() { 332 var p Pipeline 333 var t Trigger 334 var created string 335 336 err := rows.Scan( 337 &p.Id, 338 &p.Knot, 339 &p.Rkey, 340 &p.RepoOwner, 341 &p.RepoName, 342 &p.Sha, 343 &created, 344 &p.TriggerId, 345 &t.Kind, 346 &t.PushRef, 347 &t.PushNewSha, 348 &t.PushOldSha, 349 &t.PRSourceBranch, 350 &t.PRTargetBranch, 351 &t.PRSourceSha, 352 &t.PRAction, 353 ) 354 if err != nil { 355 return nil, err 356 } 357 358 // Parse created time manually 359 p.Created, err = time.Parse(time.RFC3339, created) 360 if err != nil { 361 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 362 } 363 364 // Link trigger to pipeline 365 t.Id = p.TriggerId 366 p.Trigger = &t 367 p.Statuses = make(map[string]WorkflowStatus) 368 369 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 370 pipelines[k] = p 371 } 372 373 // get all statuses 374 // the where clause here is of the form: 375 // 376 // where (pipeline_knot = k1 and pipeline_rkey = r1) 377 // or (pipeline_knot = k2 and pipeline_rkey = r2) 378 conditions = nil 379 args = nil 380 for _, p := range pipelines { 381 knotFilter := FilterEq("pipeline_knot", p.Knot) 382 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 383 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 384 args = append(args, p.Knot) 385 args = append(args, p.Rkey) 386 } 387 whereClause = "" 388 if conditions != nil { 389 whereClause = "where " + strings.Join(conditions, " or ") 390 } 391 query = fmt.Sprintf(` 392 select 393 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 394 from 395 pipeline_statuses 396 %s 397 `, whereClause) 398 399 rows, err = e.Query(query, args...) 400 if err != nil { 401 return nil, err 402 } 403 defer rows.Close() 404 405 for rows.Next() { 406 var ps PipelineStatus 407 var created string 408 409 err := rows.Scan( 410 &ps.ID, 411 &ps.Spindle, 412 &ps.Rkey, 413 &ps.PipelineKnot, 414 &ps.PipelineRkey, 415 &created, 416 &ps.Workflow, 417 &ps.Status, 418 &ps.Error, 419 &ps.ExitCode, 420 ) 421 if err != nil { 422 return nil, err 423 } 424 425 ps.Created, err = time.Parse(time.RFC3339, created) 426 if err != nil { 427 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 428 } 429 430 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 431 432 // extract 433 pipeline, ok := pipelines[key] 434 if !ok { 435 continue 436 } 437 statuses, _ := pipeline.Statuses[ps.Workflow] 438 if !ok { 439 pipeline.Statuses[ps.Workflow] = WorkflowStatus{} 440 } 441 442 // append 443 statuses.data = append(statuses.data, ps) 444 445 // reassign 446 pipeline.Statuses[ps.Workflow] = statuses 447 pipelines[key] = pipeline 448 } 449 450 var all []Pipeline 451 for _, p := range pipelines { 452 for _, s := range p.Statuses { 453 slices.SortFunc(s.data, func(a, b PipelineStatus) int { 454 if a.Created.After(b.Created) { 455 return 1 456 } 457 return -1 458 }) 459 } 460 all = append(all, p) 461 } 462 463 // sort pipelines by date 464 slices.SortFunc(all, func(a, b Pipeline) int { 465 if a.Created.After(b.Created) { 466 return -1 467 } 468 return 1 469 }) 470 471 return all, nil 472}