package db import ( "fmt" "slices" "strings" "time" "tangled.org/core/appview/models" ) func GetPipelines(e Execer, filters ...filter) ([]models.Pipeline, error) { var pipelines []models.Pipeline var conditions []string var args []any for _, filter := range filters { conditions = append(conditions, filter.Condition()) args = append(args, filter.Arg()...) } whereClause := "" if conditions != nil { whereClause = " where " + strings.Join(conditions, " and ") } query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha, created from pipelines %s`, whereClause) rows, err := e.Query(query, args...) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var pipeline models.Pipeline var createdAt string err = rows.Scan( &pipeline.Id, &pipeline.Rkey, &pipeline.Knot, &pipeline.RepoOwner, &pipeline.RepoName, &pipeline.Sha, &createdAt, ) if err != nil { return nil, err } if t, err := time.Parse(time.RFC3339, createdAt); err == nil { pipeline.Created = t } pipelines = append(pipelines, pipeline) } if err = rows.Err(); err != nil { return nil, err } return pipelines, nil } func AddPipeline(e Execer, pipeline models.Pipeline) error { args := []any{ pipeline.Rkey, pipeline.Knot, pipeline.RepoOwner, pipeline.RepoName, pipeline.TriggerId, pipeline.Sha, } placeholders := make([]string, len(args)) for i := range placeholders { placeholders[i] = "?" } query := fmt.Sprintf(` insert or ignore into pipelines ( rkey, knot, repo_owner, repo_name, trigger_id, sha ) values (%s) `, strings.Join(placeholders, ",")) _, err := e.Exec(query, args...) return err } func AddTrigger(e Execer, trigger models.Trigger) (int64, error) { args := []any{ trigger.Kind, trigger.PushRef, trigger.PushNewSha, trigger.PushOldSha, trigger.PRSourceBranch, trigger.PRTargetBranch, trigger.PRSourceSha, trigger.PRAction, } placeholders := make([]string, len(args)) for i := range placeholders { placeholders[i] = "?" } query := fmt.Sprintf(`insert or ignore into triggers ( kind, push_ref, push_new_sha, push_old_sha, pr_source_branch, pr_target_branch, pr_source_sha, pr_action ) values (%s)`, strings.Join(placeholders, ",")) res, err := e.Exec(query, args...) if err != nil { return 0, err } return res.LastInsertId() } func AddPipelineStatus(e Execer, status models.PipelineStatus) error { args := []any{ status.Spindle, status.Rkey, status.PipelineKnot, status.PipelineRkey, status.Workflow, status.Status, status.Error, status.ExitCode, status.Created.Format(time.RFC3339), } placeholders := make([]string, len(args)) for i := range placeholders { placeholders[i] = "?" } query := fmt.Sprintf(` insert or ignore into pipeline_statuses ( spindle, rkey, pipeline_knot, pipeline_rkey, workflow, status, error, exit_code, created ) values (%s) `, strings.Join(placeholders, ",")) _, err := e.Exec(query, args...) return err } // this is a mega query, but the most useful one: // get N pipelines, for each one get the latest status of its N workflows func GetPipelineStatuses(e Execer, filters ...filter) ([]models.Pipeline, error) { var conditions []string var args []any for _, filter := range filters { filter.key = "p." + filter.key // the table is aliased in the query to `p` conditions = append(conditions, filter.Condition()) args = append(args, filter.Arg()...) } whereClause := "" if conditions != nil { whereClause = " where " + strings.Join(conditions, " and ") } query := fmt.Sprintf(` select p.id, p.knot, p.rkey, p.repo_owner, p.repo_name, p.sha, p.created, t.id, t.kind, t.push_ref, t.push_new_sha, t.push_old_sha, t.pr_source_branch, t.pr_target_branch, t.pr_source_sha, t.pr_action from pipelines p join triggers t ON p.trigger_id = t.id %s `, whereClause) rows, err := e.Query(query, args...) if err != nil { return nil, err } defer rows.Close() pipelines := make(map[string]models.Pipeline) for rows.Next() { var p models.Pipeline var t models.Trigger var created string err := rows.Scan( &p.Id, &p.Knot, &p.Rkey, &p.RepoOwner, &p.RepoName, &p.Sha, &created, &p.TriggerId, &t.Kind, &t.PushRef, &t.PushNewSha, &t.PushOldSha, &t.PRSourceBranch, &t.PRTargetBranch, &t.PRSourceSha, &t.PRAction, ) if err != nil { return nil, err } p.Created, err = time.Parse(time.RFC3339, created) if err != nil { return nil, fmt.Errorf("invalid pipeline created timestamp %q: %w", created, err) } t.Id = p.TriggerId p.Trigger = &t p.Statuses = make(map[string]models.WorkflowStatus) k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) pipelines[k] = p } // get all statuses // the where clause here is of the form: // // where (pipeline_knot = k1 and pipeline_rkey = r1) // or (pipeline_knot = k2 and pipeline_rkey = r2) conditions = nil args = nil for _, p := range pipelines { knotFilter := FilterEq("pipeline_knot", p.Knot) rkeyFilter := FilterEq("pipeline_rkey", p.Rkey) conditions = append(conditions, fmt.Sprintf("(%s and %s)", knotFilter.Condition(), rkeyFilter.Condition())) args = append(args, p.Knot) args = append(args, p.Rkey) } whereClause = "" if conditions != nil { whereClause = "where " + strings.Join(conditions, " or ") } query = fmt.Sprintf(` select id, spindle, rkey, pipeline_knot, pipeline_rkey, created, workflow, status, error, exit_code from pipeline_statuses %s `, whereClause) rows, err = e.Query(query, args...) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var ps models.PipelineStatus var created string err := rows.Scan( &ps.ID, &ps.Spindle, &ps.Rkey, &ps.PipelineKnot, &ps.PipelineRkey, &created, &ps.Workflow, &ps.Status, &ps.Error, &ps.ExitCode, ) if err != nil { return nil, err } ps.Created, err = time.Parse(time.RFC3339, created) if err != nil { return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) } key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) // extract pipeline, ok := pipelines[key] if !ok { continue } statuses, _ := pipeline.Statuses[ps.Workflow] if !ok { pipeline.Statuses[ps.Workflow] = models.WorkflowStatus{} } // append statuses.Data = append(statuses.Data, ps) // reassign pipeline.Statuses[ps.Workflow] = statuses pipelines[key] = pipeline } var all []models.Pipeline for _, p := range pipelines { for _, s := range p.Statuses { slices.SortFunc(s.Data, func(a, b models.PipelineStatus) int { if a.Created.After(b.Created) { return 1 } if a.Created.Before(b.Created) { return -1 } if a.ID > b.ID { return 1 } if a.ID < b.ID { return -1 } return 0 }) } all = append(all, p) } // sort pipelines by date slices.SortFunc(all, func(a, b models.Pipeline) int { if a.Created.After(b.Created) { return -1 } return 1 }) return all, nil }