From 02cf40bd1b45e5c5ee05dedea88a6edf3c6b8952 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Tue, 17 Jun 2025 11:21:07 +0100 Subject: [PATCH] appview/db: add tables for pipelines, triggers and statuses Change-Id: kxyyrrwtwwpmpprprlywkztkxyppssmx Signed-off-by: oppiliappan --- appview/db/db.go | 63 ++++++++++++- appview/db/pipeline.go | 200 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 appview/db/pipeline.go diff --git a/appview/db/db.go b/appview/db/db.go index 3447d85..5f86714 100644 --- a/appview/db/db.go +++ b/appview/db/db.go @@ -331,10 +331,71 @@ func Make(dbPath string) (*DB, error) { unique(instance) ); + create table if not exists pipelines ( + -- identifiers + id integer primary key autoincrement, + knot text not null, + rkey text not null, + + repo_owner text not null, + repo_name text not null, + + -- every pipeline must be associated with exactly one commit + sha text not null check (length(sha) = 40), + + -- trigger data + trigger_id integer not null, + + unique(knot, rkey), + foreign key (trigger_id) references triggers(id) on delete cascade + ); + + create table if not exists triggers ( + -- primary key + id integer primary key autoincrement, + + -- top-level fields + kind text not null, + + -- pushTriggerData fields + push_ref text, + push_new_sha text check (length(push_new_sha) = 40), + push_old_sha text check (length(push_old_sha) = 40), + + -- pullRequestTriggerData fields + pr_source_branch text, + pr_target_branch text, + pr_source_sha text check (length(pr_source_sha) = 40), + pr_action text + ); + + create table if not exists pipeline_statuses ( + -- identifiers + id integer primary key autoincrement, + spindle text not null, + rkey text not null, + + -- referenced pipeline. these form the (did, rkey) pair + pipeline_knot text not null, + pipeline_rkey text not null, + + -- content + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + workflow text not null, + status text not null, + error text, + exit_code integer not null default 0, + + unique (spindle, rkey), + foreign key (pipeline_knot, pipeline_rkey) + references pipelines (knot, rkey) + on delete cascade + ); + create table if not exists migrations ( id integer primary key autoincrement, name text unique - ) + ); `) if err != nil { return nil, err diff --git a/appview/db/pipeline.go b/appview/db/pipeline.go new file mode 100644 index 0000000..43a05b0 --- /dev/null +++ b/appview/db/pipeline.go @@ -0,0 +1,200 @@ +package db + +import ( + "fmt" + "strings" + "time" + + "github.com/bluesky-social/indigo/atproto/syntax" +) + +type Pipeline struct { + Id int + Rkey string + Knot string + RepoOwner syntax.DID + RepoName string + TriggerId int + Sha string + + // populate when querying for revers mappings + Trigger *Trigger +} + +type Trigger struct { + Id int + Kind string + + // push trigger fields + PushRef *string + PushNewSha *string + PushOldSha *string + + // pull request trigger fields + PRSourceBranch *string + PRTargetBranch *string + PRSourceSha *string + PRAction *string +} + +type PipelineStatus struct { + ID int + Spindle string + Rkey string + PipelineKnot string + PipelineRkey string + Created time.Time + Workflow string + Status string + Error *string + ExitCode int +} + +func GetPipelines(e Execer, filters ...filter) ([]Pipeline, error) { + var pipelines []Pipeline + + var conditions []string + var args []any + for _, filter := range filters { + conditions = append(conditions, filter.Condition()) + args = append(args, filter.arg) + } + + whereClause := "" + if conditions != nil { + whereClause = " where " + strings.Join(conditions, " and ") + } + + query := fmt.Sprintf(`select id, rkey, knot, repo_owner, repo_name, sha from pipelines %s`, whereClause) + + rows, err := e.Query(query, args...) + + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var pipeline Pipeline + err = rows.Scan( + &pipeline.Id, + &pipeline.Rkey, + &pipeline.Knot, + &pipeline.RepoOwner, + &pipeline.RepoName, + &pipeline.Sha, + ) + if err != nil { + return nil, err + } + + pipelines = append(pipelines, pipeline) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return pipelines, nil +} + +func AddPipeline(e Execer, pipeline Pipeline) error { + args := []any{ + pipeline.Rkey, + pipeline.Knot, + pipeline.RepoOwner, + pipeline.RepoName, + pipeline.TriggerId, + pipeline.Sha, + } + + placeholders := make([]string, len(args)) + for i := range placeholders { + placeholders[i] = "?" + } + + query := fmt.Sprintf(` + insert or ignore into pipelines ( + rkey, + knot, + repo_owner, + repo_name, + trigger_id, + sha + ) values (%s) + `, strings.Join(placeholders, ",")) + + _, err := e.Exec(query, args...) + + return err +} + +func AddTrigger(e Execer, trigger Trigger) (int64, error) { + args := []any{ + trigger.Kind, + trigger.PushRef, + trigger.PushNewSha, + trigger.PushOldSha, + trigger.PRSourceBranch, + trigger.PRTargetBranch, + trigger.PRSourceSha, + trigger.PRAction, + } + + placeholders := make([]string, len(args)) + for i := range placeholders { + placeholders[i] = "?" + } + + query := fmt.Sprintf(`insert or ignore into triggers ( + kind, + push_ref, + push_new_sha, + push_old_sha, + pr_source_branch, + pr_target_branch, + pr_source_sha, + pr_action + ) values (%s)`, strings.Join(placeholders, ",")) + + res, err := e.Exec(query, args...) + if err != nil { + return 0, err + } + + return res.LastInsertId() +} + +func AddPipelineStatus(e Execer, status PipelineStatus) error { + args := []any{ + status.Spindle, + status.Rkey, + status.PipelineKnot, + status.PipelineRkey, + status.Workflow, + status.Status, + status.Error, + status.ExitCode, + } + + placeholders := make([]string, len(args)) + for i := range placeholders { + placeholders[i] = "?" + } + + query := fmt.Sprintf(` + insert or ignore into pipeline_statuses ( + spindle, + rkey, + pipeline_knot, + pipeline_rkey, + workflow, + status, + error, + exit_code + ) values (%s) + `, strings.Join(placeholders, ",")) + + _, err := e.Exec(query, args...) + return err +} -- 2.43.0