forked from tangled.org/core
this repo has no description
1package db 2 3import ( 4 "fmt" 5 "slices" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/go-git/go-git/v5/plumbing" 11 spindle "tangled.sh/tangled.sh/core/spindle/models" 12 "tangled.sh/tangled.sh/core/workflow" 13) 14 15type Pipeline struct { 16 Id int 17 Rkey string 18 Knot string 19 RepoOwner syntax.DID 20 RepoName string 21 TriggerId int 22 Sha string 23 Created time.Time 24 25 // populate when querying for reverse mappings 26 Trigger *Trigger 27 Statuses map[string]WorkflowStatus 28} 29 30type WorkflowStatus struct { 31 Data []PipelineStatus 32} 33 34func (w WorkflowStatus) Latest() PipelineStatus { 35 return w.Data[len(w.Data)-1] 36} 37 38// time taken by this workflow to reach an "end state" 39func (w WorkflowStatus) TimeTaken() time.Duration { 40 var start, end *time.Time 41 for _, s := range w.Data { 42 if s.Status.IsStart() { 43 start = &s.Created 44 } 45 if s.Status.IsFinish() { 46 end = &s.Created 47 } 48 } 49 50 if start != nil && end != nil && end.After(*start) { 51 return end.Sub(*start) 52 } 53 54 return 0 55} 56 57func (p Pipeline) Counts() map[string]int { 58 m := make(map[string]int) 59 for _, w := range p.Statuses { 60 m[w.Latest().Status.String()] += 1 61 } 62 return m 63} 64 65func (p Pipeline) TimeTaken() time.Duration { 66 var s time.Duration 67 for _, w := range p.Statuses { 68 s += w.TimeTaken() 69 } 70 return s 71} 72 73func (p Pipeline) Workflows() []string { 74 var ws []string 75 for v := range p.Statuses { 76 ws = append(ws, v) 77 } 78 slices.Sort(ws) 79 return ws 80} 81 82// if we know that a spindle has picked up this pipeline, then it is Responding 83func (p Pipeline) IsResponding() bool { 84 return len(p.Statuses) != 0 85} 86 87type Trigger struct { 88 Id int 89 Kind workflow.TriggerKind 90 91 // push trigger fields 92 PushRef *string 93 PushNewSha *string 94 PushOldSha *string 95 96 // pull request trigger fields 97 PRSourceBranch *string 98 PRTargetBranch *string 99 PRSourceSha *string 100 PRAction *string 101} 102 103func (t *Trigger) IsPush() bool { 104 return t != nil && t.Kind == workflow.TriggerKindPush 105} 106 107func (t *Trigger) IsPullRequest() bool { 108 return t != nil && t.Kind == workflow.TriggerKindPullRequest 109} 110 111func (t *Trigger) TargetRef() string { 112 if t.IsPush() { 113 return plumbing.ReferenceName(*t.PushRef).Short() 114 } else if t.IsPullRequest() { 115 return *t.PRTargetBranch 116 } 117 118 return "" 119} 120 121type PipelineStatus struct { 122 ID int 123 Spindle string 124 Rkey string 125 PipelineKnot string 126 PipelineRkey string 127 Created time.Time 128 Workflow string 129 Status spindle.StatusKind 130 Error *string 131 ExitCode int 132} 133 134func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { 135 var pipelines []Pipeline 136 137 var conditions []string 138 var args []any 139 for _, filter := range filters { 140 conditions = append(conditions, filter.Condition()) 141 args = append(args, filter.Arg()...) 142 } 143 144 whereClause := "" 145 if conditions != nil { 146 whereClause = " where " + strings.Join(conditions, " and ") 147 } 148 149 query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) 150 151 rows, err := e.Query(query, args...) 152 153 if err != nil { 154 return nil, err 155 } 156 defer rows.Close() 157 158 for rows.Next() { 159 var pipeline Pipeline 160 var createdAt string 161 err = rows.Scan( 162 &pipeline.Id, 163 &pipeline.Rkey, 164 &pipeline.Knot, 165 &pipeline.RepoOwner, 166 &pipeline.RepoName, 167 &pipeline.Sha, 168 &createdAt, 169 ) 170 if err != nil { 171 return nil, err 172 } 173 174 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 175 pipeline.Created = t 176 } 177 178 pipelines = append(pipelines, pipeline) 179 } 180 181 if err = rows.Err(); err != nil { 182 return nil, err 183 } 184 185 return pipelines, nil 186} 187 188func AddPipeline(e Execer, pipeline Pipeline) error { 189 args := []any{ 190 pipeline.Rkey, 191 pipeline.Knot, 192 pipeline.RepoOwner, 193 pipeline.RepoName, 194 pipeline.TriggerId, 195 pipeline.Sha, 196 } 197 198 placeholders := make([]string, len(args)) 199 for i := range placeholders { 200 placeholders[i] = "?" 201 } 202 203 query := fmt.Sprintf(` 204 insert or ignore into pipelines ( 205 rkey, 206 knot, 207 repo_owner, 208 repo_name, 209 trigger_id, 210 sha 211 ) values (%s) 212 `, strings.Join(placeholders, ",")) 213 214 _, err := e.Exec(query, args...) 215 216 return err 217} 218 219func AddTrigger(e Execer, trigger Trigger) (int64, error) { 220 args := []any{ 221 trigger.Kind, 222 trigger.PushRef, 223 trigger.PushNewSha, 224 trigger.PushOldSha, 225 trigger.PRSourceBranch, 226 trigger.PRTargetBranch, 227 trigger.PRSourceSha, 228 trigger.PRAction, 229 } 230 231 placeholders := make([]string, len(args)) 232 for i := range placeholders { 233 placeholders[i] = "?" 234 } 235 236 query := fmt.Sprintf(`insert or ignore into triggers ( 237 kind, 238 push_ref, 239 push_new_sha, 240 push_old_sha, 241 pr_source_branch, 242 pr_target_branch, 243 pr_source_sha, 244 pr_action 245 ) values (%s)`, strings.Join(placeholders, ",")) 246 247 res, err := e.Exec(query, args...) 248 if err != nil { 249 return 0, err 250 } 251 252 return res.LastInsertId() 253} 254 255func AddPipelineStatus(e Execer, status PipelineStatus) error { 256 args := []any{ 257 status.Spindle, 258 status.Rkey, 259 status.PipelineKnot, 260 status.PipelineRkey, 261 status.Workflow, 262 status.Status, 263 status.Error, 264 status.ExitCode, 265 status.Created.Format(time.RFC3339), 266 } 267 268 placeholders := make([]string, len(args)) 269 for i := range placeholders { 270 placeholders[i] = "?" 271 } 272 273 query := fmt.Sprintf(` 274 insert or ignore into pipeline_statuses ( 275 spindle, 276 rkey, 277 pipeline_knot, 278 pipeline_rkey, 279 workflow, 280 status, 281 error, 282 exit_code, 283 created 284 ) values (%s) 285 `, strings.Join(placeholders, ",")) 286 287 _, err := e.Exec(query, args...) 288 return err 289} 290 291// this is a mega query, but the most useful one: 292// get N pipelines, for each one get the latest status of its N workflows 293func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { 294 var conditions []string 295 var args []any 296 for _, filter := range filters { 297 filter.key = "p." + filter.key // the table is aliased in the query to `p` 298 conditions = append(conditions, filter.Condition()) 299 args = append(args, filter.Arg()...) 300 } 301 302 whereClause := "" 303 if conditions != nil { 304 whereClause = " where " + strings.Join(conditions, " and ") 305 } 306 307 query := fmt.Sprintf(` 308 select 309 p.id, 310 p.knot, 311 p.rkey, 312 p.repo_owner, 313 p.repo_name, 314 p.sha, 315 p.created, 316 t.id, 317 t.kind, 318 t.push_ref, 319 t.push_new_sha, 320 t.push_old_sha, 321 t.pr_source_branch, 322 t.pr_target_branch, 323 t.pr_source_sha, 324 t.pr_action 325 from 326 pipelines p 327 join 328 triggers t ON p.trigger_id = t.id 329 %s 330 `, whereClause) 331 332 rows, err := e.Query(query, args...) 333 if err != nil { 334 return nil, err 335 } 336 defer rows.Close() 337 338 pipelines := make(map[string]Pipeline) 339 for rows.Next() { 340 var p Pipeline 341 var t Trigger 342 var created string 343 344 err := rows.Scan( 345 &p.Id, 346 &p.Knot, 347 &p.Rkey, 348 &p.RepoOwner, 349 &p.RepoName, 350 &p.Sha, 351 &created, 352 &p.TriggerId, 353 &t.Kind, 354 &t.PushRef, 355 &t.PushNewSha, 356 &t.PushOldSha, 357 &t.PRSourceBranch, 358 &t.PRTargetBranch, 359 &t.PRSourceSha, 360 &t.PRAction, 361 ) 362 if err != nil { 363 return nil, err 364 } 365 366 p.Created, err = time.Parse(time.RFC3339, created) 367 if err != nil { 368 return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) 369 } 370 371 t.Id = p.TriggerId 372 p.Trigger = &t 373 p.Statuses = make(map[string]WorkflowStatus) 374 375 k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 376 pipelines[k] = p 377 } 378 379 // get all statuses 380 // the where clause here is of the form: 381 // 382 // where (pipeline_knot = k1 and pipeline_rkey = r1) 383 // or (pipeline_knot = k2 and pipeline_rkey = r2) 384 conditions = nil 385 args = nil 386 for _, p := range pipelines { 387 knotFilter := FilterEq("pipeline_knot", p.Knot) 388 rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) 389 conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) 390 args = append(args, p.Knot) 391 args = append(args, p.Rkey) 392 } 393 whereClause = "" 394 if conditions != nil { 395 whereClause = "where " + strings.Join(conditions, " or ") 396 } 397 query = fmt.Sprintf(` 398 select 399 id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code 400 from 401 pipeline_statuses 402 %s 403 `, whereClause) 404 405 rows, err = e.Query(query, args...) 406 if err != nil { 407 return nil, err 408 } 409 defer rows.Close() 410 411 for rows.Next() { 412 var ps PipelineStatus 413 var created string 414 415 err := rows.Scan( 416 &ps.ID, 417 &ps.Spindle, 418 &ps.Rkey, 419 &ps.PipelineKnot, 420 &ps.PipelineRkey, 421 &created, 422 &ps.Workflow, 423 &ps.Status, 424 &ps.Error, 425 &ps.ExitCode, 426 ) 427 if err != nil { 428 return nil, err 429 } 430 431 ps.Created, err = time.Parse(time.RFC3339, created) 432 if err != nil { 433 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 434 } 435 436 key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 437 438 // extract 439 pipeline, ok := pipelines[key] 440 if !ok { 441 continue 442 } 443 statuses, _ := pipeline.Statuses[ps.Workflow] 444 if !ok { 445 pipeline.Statuses[ps.Workflow] = WorkflowStatus{} 446 } 447 448 // append 449 statuses.Data = append(statuses.Data, ps) 450 451 // reassign 452 pipeline.Statuses[ps.Workflow] = statuses 453 pipelines[key] = pipeline 454 } 455 456 var all []Pipeline 457 for _, p := range pipelines { 458 for _, s := range p.Statuses { 459 slices.SortFunc(s.Data, func(a, b PipelineStatus) int { 460 if a.Created.After(b.Created) { 461 return 1 462 } 463 if a.Created.Before(b.Created) { 464 return -1 465 } 466 if a.ID > b.ID { 467 return 1 468 } 469 if a.ID < b.ID { 470 return -1 471 } 472 return 0 473 }) 474 } 475 all = append(all, p) 476 } 477 478 // sort pipelines by date 479 slices.SortFunc(all, func(a, b Pipeline) int { 480 if a.Created.After(b.Created) { 481 return -1 482 } 483 return 1 484 }) 485 486 return all, nil 487}