appview/db: add query to fetch pipeline statuses #253

merged
opened by oppi.li targeting master from push-mwkwusmyymno
+2 -2
appview/db/artifact.go
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
+39
appview/db/db.go
···
"database/sql"
"fmt"
"log"
+
"reflect"
+
"strings"
_ "github.com/mattn/go-sqlite3"
)
···
-- every pipeline must be associated with exactly one commit
sha text not null check (length(sha) = 40),
+
created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
-- trigger data
trigger_id integer not null,
···
func FilterLte(key string, arg any) filter { return newFilter(key, "<=", arg) }
func FilterIs(key string, arg any) filter { return newFilter(key, "is", arg) }
func FilterIsNot(key string, arg any) filter { return newFilter(key, "is not", arg) }
+
func FilterIn(key string, arg any) filter { return newFilter(key, "in", arg) }
func (f filter) Condition() string {
+
rv := reflect.ValueOf(f.arg)
+
kind := rv.Kind()
+
+
// if we have `FilterIn(k, [1, 2, 3])`, compile it down to `k in (?, ?, ?)`
+
if kind == reflect.Slice || kind == reflect.Array {
+
if rv.Len() == 0 {
+
panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
+
}
+
+
placeholders := make([]string, rv.Len())
+
for i := range placeholders {
+
placeholders[i] = "?"
+
}
+
+
return fmt.Sprintf("%s %s (%s)", f.key, f.cmp, strings.Join(placeholders, ", "))
+
}
+
return fmt.Sprintf("%s %s ?", f.key, f.cmp)
}
+
+
func (f filter) Arg() []any {
+
rv := reflect.ValueOf(f.arg)
+
kind := rv.Kind()
+
if kind == reflect.Slice || kind == reflect.Array {
+
if rv.Len() == 0 {
+
panic(fmt.Sprintf("empty slice passed to %q filter on %s", f.cmp, f.key))
+
}
+
+
out := make([]any, rv.Len())
+
for i := range rv.Len() {
+
out[i] = rv.Index(i).Interface()
+
}
+
return out
+
}
+
+
return []any{f.arg}
+
}
+234 -5
appview/db/pipeline.go
···
import (
"fmt"
+
"slices"
"strings"
"time"
"github.com/bluesky-social/indigo/atproto/syntax"
+
spindle "tangled.sh/tangled.sh/core/spindle/models"
)
type Pipeline struct {
···
RepoName string
TriggerId int
Sha string
+
Created time.Time
-
// populate when querying for revers mappings
-
Trigger *Trigger
+
// populate when querying for reverse mappings
+
Trigger *Trigger
+
Statuses map[string]WorkflowStatus
+
}
+
+
type WorkflowStatus struct {
+
data []PipelineStatus
+
}
+
+
func (w WorkflowStatus) Latest() PipelineStatus {
+
return w.data[len(w.data)-1]
+
}
+
+
// time taken by this workflow to reach an "end state"
+
func (w WorkflowStatus) TimeTaken() time.Duration {
+
var start, end *time.Time
+
for _, s := range w.data {
+
if s.Status.IsStart() {
+
start = &s.Created
+
}
+
if s.Status.IsFinish() {
+
end = &s.Created
+
}
+
}
+
+
if start != nil && end != nil && end.After(*start) {
+
return end.Sub(*start)
+
}
+
+
return 0
+
}
+
+
func (p Pipeline) Counts() map[string]int {
+
m := make(map[string]int)
+
for _, w := range p.Statuses {
+
m[w.Latest().Status.String()] += 1
+
}
+
return m
}
type Trigger struct {
···
PipelineRkey string
Created time.Time
Workflow string
-
Status string
+
Status spindle.StatusKind
Error *string
ExitCode int
}
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
whereClause = " where " + strings.Join(conditions, " and ")
}
-
query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha from pipelines %s`, whereClause)
+
query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause)
rows, err := e.Query(query, args...)
···
for rows.Next() {
var pipeline Pipeline
+
var createdAt string
err = rows.Scan(
&pipeline.Id,
&pipeline.Rkey,
···
&pipeline.RepoOwner,
&pipeline.RepoName,
&pipeline.Sha,
+
&createdAt,
)
if err != nil {
return nil, err
}
+
if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
+
pipeline.Created = t
+
}
+
pipelines = append(pipelines, pipeline)
}
···
_, err := e.Exec(query, args...)
return err
}
+
+
// this is a mega query, but the most useful one:
+
// get N pipelines, for each one get the latest status of its N workflows
+
func GetPipelineStatuses(e Execer, filters ...filter) ([]Pipeline, error) {
+
var conditions []string
+
var args []any
+
for _, filter := range filters {
+
filter.key = "p." + filter.key // the table is aliased in the query to `p`
+
conditions = append(conditions, filter.Condition())
+
args = append(args, filter.Arg()...)
+
}
+
+
whereClause := ""
+
if conditions != nil {
+
whereClause = " where " + strings.Join(conditions, " and ")
+
}
+
+
query := fmt.Sprintf(`
+
select
+
p.id AS pipeline_id,
+
p.knot,
+
p.rkey,
+
p.repo_owner,
+
p.repo_name,
+
p.sha,
+
p.created,
+
t.id AS trigger_id,
+
t.kind,
+
t.push_ref,
+
t.push_new_sha,
+
t.push_old_sha,
+
t.pr_source_branch,
+
t.pr_target_branch,
+
t.pr_source_sha,
+
t.pr_action
+
from
+
pipelines p
+
join
+
triggers t ON p.trigger_id = t.id
+
%s
+
order by p.created desc
+
`, whereClause)
+
+
rows, err := e.Query(query, args...)
+
if err != nil {
+
return nil, err
+
}
+
defer rows.Close()
+
+
pipelines := make(map[string]Pipeline)
+
for rows.Next() {
+
var p Pipeline
+
var t Trigger
+
var created string
+
+
err := rows.Scan(
+
&p.Id,
+
&p.Knot,
+
&p.Rkey,
+
&p.RepoOwner,
+
&p.RepoName,
+
&p.Sha,
+
&created,
+
&p.TriggerId,
+
&t.Kind,
+
&t.PushRef,
+
&t.PushNewSha,
+
&t.PushOldSha,
+
&t.PRSourceBranch,
+
&t.PRTargetBranch,
+
&t.PRSourceSha,
+
&t.PRAction,
+
)
+
if err != nil {
+
return nil, err
+
}
+
+
// Parse created time manually
+
p.Created, err = time.Parse(time.RFC3339, created)
+
if err != nil {
+
return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err)
+
}
+
+
// Link trigger to pipeline
+
t.Id = p.TriggerId
+
p.Trigger = &t
+
p.Statuses = make(map[string]WorkflowStatus)
+
+
k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey)
+
pipelines[k] = p
+
}
+
+
// get all statuses
+
// the where clause here is of the form:
+
//
+
// where (pipeline_knot = k1 and pipeline_rkey = r1)
+
// or (pipeline_knot = k2 and pipeline_rkey = r2)
+
conditions = nil
+
args = nil
+
for _, p := range pipelines {
+
knotFilter := FilterEq("pipeline_knot", p.Knot)
+
rkeyFilter := FilterEq("pipeline_rkey", p.Rkey)
+
conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition()))
+
args = append(args, p.Knot)
+
args = append(args, p.Rkey)
+
}
+
whereClause = ""
+
if conditions != nil {
+
whereClause = "where " + strings.Join(conditions, " or ")
+
}
+
query = fmt.Sprintf(`
+
select
+
id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code
+
from
+
pipeline_statuses
+
%s
+
`, whereClause)
+
+
rows, err = e.Query(query, args...)
+
if err != nil {
+
return nil, err
+
}
+
defer rows.Close()
+
+
for rows.Next() {
+
var ps PipelineStatus
+
var created string
+
+
err := rows.Scan(
+
&ps.ID,
+
&ps.Spindle,
+
&ps.Rkey,
+
&ps.PipelineKnot,
+
&ps.PipelineRkey,
+
&created,
+
&ps.Workflow,
+
&ps.Status,
+
&ps.Error,
+
&ps.ExitCode,
+
)
+
if err != nil {
+
return nil, err
+
}
+
+
ps.Created, err = time.Parse(time.RFC3339, created)
+
if err != nil {
+
return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err)
+
}
+
+
key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey)
+
+
// extract
+
pipeline, ok := pipelines[key]
+
if !ok {
+
continue
+
}
+
statuses, _ := pipeline.Statuses[ps.Workflow]
+
if !ok {
+
pipeline.Statuses[ps.Workflow] = WorkflowStatus{}
+
}
+
+
// append
+
statuses.data = append(statuses.data, ps)
+
+
// reassign
+
pipeline.Statuses[ps.Workflow] = statuses
+
pipelines[key] = pipeline
+
}
+
+
var all []Pipeline
+
for _, p := range pipelines {
+
for _, s := range p.Statuses {
+
slices.SortFunc(s.data, func(a, b PipelineStatus) int {
+
if a.Created.After(b.Created) {
+
return 1
+
}
+
return -1
+
})
+
}
+
all = append(all, p)
+
}
+
+
return all, nil
+
}
+3 -3
appview/db/pulls.go
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
+1 -1
appview/db/punchcard.go
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
+3 -3
appview/db/spindle.go
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""
···
var args []any
for _, filter := range filters {
conditions = append(conditions, filter.Condition())
-
args = append(args, filter.arg)
+
args = append(args, filter.Arg()...)
}
whereClause := ""