From 3a36b3f469bc33b955307726383f8a620e4c0665 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Wed, 18 Jun 2025 11:14:07 +0100 Subject: [PATCH] appview/db: add query to fetch pipeline statuses Change-Id: kysslsltlnopqrsprruovvnnukrwlzqq Signed-off-by: oppiliappan --- appview/db/artifact.go | 4 +- appview/db/db.go | 39 +++++++ appview/db/pipeline.go | 239 +++++++++++++++++++++++++++++++++++++++- appview/db/pulls.go | 6 +- appview/db/punchcard.go | 2 +- appview/db/spindle.go | 6 +- 6 files changed, 282 insertions(+), 14 deletions(-) diff --git a/appview/db/artifact.go b/appview/db/artifact.go index 51aae7a..8f8d5d1 100644 --- a/appview/db/artifact.go +++ b/appview/db/artifact.go @@ -64,7 +64,7 @@ func GetArtifact(e Execer, filters ...filter) ([]Artifact, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -135,7 +135,7 @@ func DeleteArtifact(e Execer, filters ...filter) error { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" diff --git a/appview/db/db.go b/appview/db/db.go index 5f86714..4a966ed 100644 --- a/appview/db/db.go +++ b/appview/db/db.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" "log" + "reflect" + "strings" _ "github.com/mattn/go-sqlite3" ) @@ -342,6 +344,7 @@ func Make(dbPath string) (*DB, error) { -- every pipeline must be associated with exactly one commit sha text not null check (length(sha) = 40), + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), -- trigger data trigger_id integer not null, @@ -600,7 +603,43 @@ func FilterGte(key string, arg any) filter { return newFilter(key, ">=", arg) func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) } func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) } func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) } +func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) } func (f filter) Condition() string { + rv := reflect.ValueOf(f.arg) + kind := rv.Kind() + + // if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)` + if kind == reflect.Slice || kind == reflect.Array { + if rv.Len() == 0 { + panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) + } + + placeholders := make([]string, rv.Len()) + for i := range placeholders { + placeholders[i] = "?" + } + + return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", ")) + } + return fmt.Sprintf("%s %s ?", f.key, f.cmp) } + +func (f filter) Arg() []any { + rv := reflect.ValueOf(f.arg) + kind := rv.Kind() + if kind == reflect.Slice || kind == reflect.Array { + if rv.Len() == 0 { + panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key)) + } + + out := make([]any, rv.Len()) + for i := range rv.Len() { + out[i] = rv.Index(i).Interface() + } + return out + } + + return []any{f.arg} +} diff --git a/appview/db/pipeline.go b/appview/db/pipeline.go index 43a05b0..e845f60 100644 --- a/appview/db/pipeline.go +++ b/appview/db/pipeline.go @@ -2,10 +2,12 @@ package db import ( "fmt" + "slices" "strings" "time" "github.com/bluesky-social/indigo/atproto/syntax" + spindle "tangled.sh/tangled.sh/core/spindle/models" ) type Pipeline struct { @@ -16,9 +18,46 @@ type Pipeline struct { RepoName string TriggerId int Sha string + Created time.Time - // populate when querying for revers mappings - Trigger *Trigger + // populate when querying for reverse mappings + Trigger *Trigger + Statuses map[string]WorkflowStatus +} + +type WorkflowStatus struct { + data []PipelineStatus +} + +func (w WorkflowStatus) Latest() PipelineStatus { + return w.data[len(w.data)-1] +} + +// time taken by this workflow to reach an "end state" +func (w WorkflowStatus) TimeTaken() time.Duration { + var start, end *time.Time + for _, s := range w.data { + if s.Status.IsStart() { + start = &s.Created + } + if s.Status.IsFinish() { + end = &s.Created + } + } + + if start != nil && end != nil && end.After(*start) { + return end.Sub(*start) + } + + return 0 +} + +func (p Pipeline) Counts() map[string]int { + m := make(map[string]int) + for _, w := range p.Statuses { + m[w.Latest().Status.String()] += 1 + } + return m } type Trigger struct { @@ -45,7 +84,7 @@ type PipelineStatus struct { PipelineRkey string Created time.Time Workflow string - Status string + Status spindle.StatusKind Error *string ExitCode int } @@ -57,7 +96,7 @@ func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -65,7 +104,7 @@ func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { whereClause = " where " + strings.Join(conditions, " and ") } - query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha from pipelines %s`, whereClause) + query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) rows, err := e.Query(query, args...) @@ -76,6 +115,7 @@ func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { for rows.Next() { var pipeline Pipeline + var createdAt string err = rows.Scan( &pipeline.Id, &pipeline.Rkey, @@ -83,11 +123,16 @@ func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { &pipeline.RepoOwner, &pipeline.RepoName, &pipeline.Sha, + &createdAt, ) if err != nil { return nil, err } + if t, err := time.Parse(time.RFC3339, createdAt); err == nil { + pipeline.Created = t + } + pipelines = append(pipelines, pipeline) } @@ -198,3 +243,187 @@ func AddPipelineStatus(e Execer, status PipelineStatus) error { _, err := e.Exec(query, args...) return err } + +// this is a mega query, but the most useful one: +// get N pipelines, for each one get the latest status of its N workflows +func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) { + var conditions []string + var args []any + for _, filter := range filters { + filter.key = "p." + filter.key // the table is aliased in the query to `p` + conditions = append(conditions, filter.Condition()) + args = append(args, filter.Arg()...) + } + + whereClause := "" + if conditions != nil { + whereClause = " where " + strings.Join(conditions, " and ") + } + + query := fmt.Sprintf(` + select + p.id AS pipeline_id, + p.knot, + p.rkey, + p.repo_owner, + p.repo_name, + p.sha, + p.created, + t.id AS trigger_id, + t.kind, + t.push_ref, + t.push_new_sha, + t.push_old_sha, + t.pr_source_branch, + t.pr_target_branch, + t.pr_source_sha, + t.pr_action + from + pipelines p + join + triggers t ON p.trigger_id = t.id + %s + order by p.created desc + `, whereClause) + + rows, err := e.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + pipelines := make(map[string]Pipeline) + for rows.Next() { + var p Pipeline + var t Trigger + var created string + + err := rows.Scan( + &p.Id, + &p.Knot, + &p.Rkey, + &p.RepoOwner, + &p.RepoName, + &p.Sha, + &created, + &p.TriggerId, + &t.Kind, + &t.PushRef, + &t.PushNewSha, + &t.PushOldSha, + &t.PRSourceBranch, + &t.PRTargetBranch, + &t.PRSourceSha, + &t.PRAction, + ) + if err != nil { + return nil, err + } + + // Parse created time manually + p.Created, err = time.Parse(time.RFC3339, created) + if err != nil { + return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) + } + + // Link trigger to pipeline + t.Id = p.TriggerId + p.Trigger = &t + p.Statuses = make(map[string]WorkflowStatus) + + k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) + pipelines[k] = p + } + + // get all statuses + // the where clause here is of the form: + // + // where (pipeline_knot = k1 and pipeline_rkey = r1) + // or (pipeline_knot = k2 and pipeline_rkey = r2) + conditions = nil + args = nil + for _, p := range pipelines { + knotFilter := FilterEq("pipeline_knot", p.Knot) + rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) + conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) + args = append(args, p.Knot) + args = append(args, p.Rkey) + } + whereClause = "" + if conditions != nil { + whereClause = "where " + strings.Join(conditions, " or ") + } + query = fmt.Sprintf(` + select + id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code + from + pipeline_statuses + %s + `, whereClause) + + rows, err = e.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var ps PipelineStatus + var created string + + err := rows.Scan( + &ps.ID, + &ps.Spindle, + &ps.Rkey, + &ps.PipelineKnot, + &ps.PipelineRkey, + &created, + &ps.Workflow, + &ps.Status, + &ps.Error, + &ps.ExitCode, + ) + if err != nil { + return nil, err + } + + ps.Created, err = time.Parse(time.RFC3339, created) + if err != nil { + return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) + } + + key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) + + // extract + pipeline, ok := pipelines[key] + if !ok { + continue + } + statuses, _ := pipeline.Statuses[ps.Workflow] + if !ok { + pipeline.Statuses[ps.Workflow] = WorkflowStatus{} + } + + // append + statuses.data = append(statuses.data, ps) + + // reassign + pipeline.Statuses[ps.Workflow] = statuses + pipelines[key] = pipeline + } + + var all []Pipeline + for _, p := range pipelines { + for _, s := range p.Statuses { + slices.SortFunc(s.data, func(a, b PipelineStatus) int { + if a.Created.After(b.Created) { + return 1 + } + return -1 + }) + } + all = append(all, p) + } + + return all, nil +} diff --git a/appview/db/pulls.go b/appview/db/pulls.go index 7d045f9..feb4196 100644 --- a/appview/db/pulls.go +++ b/appview/db/pulls.go @@ -311,7 +311,7 @@ func GetPulls(e Execer, filters ...filter) ([]*Pull, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -866,7 +866,7 @@ func SetPullParentChangeId(e Execer, parentChangeId string, filters ...filter) e for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -891,7 +891,7 @@ func UpdatePull(e Execer, newPatch, sourceRev string, filters ...filter) error { for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" diff --git a/appview/db/punchcard.go b/appview/db/punchcard.go index 0569068..62a386e 100644 --- a/appview/db/punchcard.go +++ b/appview/db/punchcard.go @@ -45,7 +45,7 @@ func MakePunchcard(e Execer, filters ...filter) (Punchcard, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" diff --git a/appview/db/spindle.go b/appview/db/spindle.go index 0865de7..bf83c04 100644 --- a/appview/db/spindle.go +++ b/appview/db/spindle.go @@ -24,7 +24,7 @@ func GetSpindles(e Execer, filters ...filter) ([]Spindle, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -98,7 +98,7 @@ func VerifySpindle(e Execer, filters ...filter) (int64, error) { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" @@ -121,7 +121,7 @@ func DeleteSpindle(e Execer, filters ...filter) error { var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) - args = append(args, filter.arg) + args = append(args, filter.Arg()...) } whereClause := "" -- 2.43.0