From abb36350f67860ed7c014a337cd9c2776e1c73a2 Mon Sep 17 00:00:00 2001 From: Winter Date: Thu, 7 Aug 2025 18:08:45 -0400 Subject: [PATCH] spindle: make workflows engine-agnostic Change-Id: vuoymuwoorlwskqmzwtyzrlvkkpnnypn Signed-off-by: Winter --- api/tangled/cbor_gen.go | 653 ++---------------- api/tangled/tangledpipeline.go | 22 +- cmd/gen.go | 2 - lexicons/pipeline/pipeline.json | 70 +- nix/modules/spindle.nix | 4 +- spindle/config/config.go | 8 +- spindle/engine/engine.go | 484 ++----------- spindle/engine/errors.go | 9 - .../nixery}/ansi_stripper.go | 2 +- spindle/engines/nixery/engine.go | 476 +++++++++++++ spindle/{engine => engines/nixery}/envs.go | 2 +- .../{engine => engines/nixery}/envs_test.go | 2 +- spindle/engines/nixery/errors.go | 7 + .../{models => engines/nixery}/setup_steps.go | 23 +- spindle/models/engine.go | 17 + spindle/{engine => models}/logger.go | 18 +- spindle/models/models.go | 6 +- spindle/models/pipeline.go | 111 +-- spindle/server.go | 46 +- spindle/stream.go | 3 +- spindle/xrpc/xrpc.go | 4 +- workflow/compile.go | 53 +- workflow/compile_test.go | 52 +- workflow/def.go | 39 +- workflow/def_test.go | 87 +-- 25 files changed, 756 insertions(+), 1444 deletions(-) delete mode 100644 spindle/engine/errors.go rename spindle/{engine => engines/nixery}/ansi_stripper.go (96%) create mode 100644 spindle/engines/nixery/engine.go rename spindle/{engine => engines/nixery}/envs.go (97%) rename spindle/{engine => engines/nixery}/envs_test.go (98%) create mode 100644 spindle/engines/nixery/errors.go rename spindle/{models => engines/nixery}/setup_steps.go (88%) create mode 100644 spindle/models/engine.go rename spindle/{engine => models}/logger.go (74%) diff --git a/api/tangled/cbor_gen.go b/api/tangled/cbor_gen.go index eba0337..257bf9c 100644 --- a/api/tangled/cbor_gen.go +++ b/api/tangled/cbor_gen.go @@ -2728,179 +2728,6 @@ func (t *Pipeline_CloneOpts) UnmarshalCBOR(r io.Reader) (err error) { return nil } -func (t *Pipeline_Dependency) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - - cw := cbg.NewCborWriter(w) - - if _, err := cw.Write([]byte{162}); err != nil { - return err - } - - // t.Packages ([]string) (slice) - if len("packages") > 1000000 { - return xerrors.Errorf("Value in field \"packages\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("packages"))); err != nil { - return err - } - if _, err := cw.WriteString(string("packages")); err != nil { - return err - } - - if len(t.Packages) > 8192 { - return xerrors.Errorf("Slice value in field t.Packages was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Packages))); err != nil { - return err - } - for _, v := range t.Packages { - if len(v) > 1000000 { - return xerrors.Errorf("Value in field v was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(v))); err != nil { - return err - } - if _, err := cw.WriteString(string(v)); err != nil { - return err - } - - } - - // t.Registry (string) (string) - if len("registry") > 1000000 { - return xerrors.Errorf("Value in field \"registry\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("registry"))); err != nil { - return err - } - if _, err := cw.WriteString(string("registry")); err != nil { - return err - } - - if len(t.Registry) > 1000000 { - return xerrors.Errorf("Value in field t.Registry was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Registry))); err != nil { - return err - } - if _, err := cw.WriteString(string(t.Registry)); err != nil { - return err - } - return nil -} - -func (t *Pipeline_Dependency) UnmarshalCBOR(r io.Reader) (err error) { - *t = Pipeline_Dependency{} - - cr := cbg.NewCborReader(r) - - maj, extra, err := cr.ReadHeader() - if err != nil { - return err - } - defer func() { - if err == io.EOF { - err = io.ErrUnexpectedEOF - } - }() - - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("Pipeline_Dependency: map struct too large (%d)", extra) - } - - n := extra - - nameBuf := make([]byte, 8) - for i := uint64(0); i < n; i++ { - nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) - if err != nil { - return err - } - - if !ok { - // Field doesn't exist on this type, so ignore it - if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { - return err - } - continue - } - - switch string(nameBuf[:nameLen]) { - // t.Packages ([]string) (slice) - case "packages": - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - - if extra > 8192 { - return fmt.Errorf("t.Packages: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.Packages = make([]string, extra) - } - - for i := 0; i < int(extra); i++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - sval, err := cbg.ReadStringWithMax(cr, 1000000) - if err != nil { - return err - } - - t.Packages[i] = string(sval) - } - - } - } - // t.Registry (string) (string) - case "registry": - - { - sval, err := cbg.ReadStringWithMax(cr, 1000000) - if err != nil { - return err - } - - t.Registry = string(sval) - } - - default: - // Field doesn't exist on this type, so ignore it - if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { - return err - } - } - } - - return nil -} func (t *Pipeline_ManualTriggerData) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) @@ -3916,223 +3743,6 @@ func (t *PipelineStatus) UnmarshalCBOR(r io.Reader) (err error) { return nil } -func (t *Pipeline_Step) MarshalCBOR(w io.Writer) error { - if t == nil { - _, err := w.Write(cbg.CborNull) - return err - } - - cw := cbg.NewCborWriter(w) - fieldCount := 3 - - if t.Environment == nil { - fieldCount-- - } - - if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { - return err - } - - // t.Name (string) (string) - if len("name") > 1000000 { - return xerrors.Errorf("Value in field \"name\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("name"))); err != nil { - return err - } - if _, err := cw.WriteString(string("name")); err != nil { - return err - } - - if len(t.Name) > 1000000 { - return xerrors.Errorf("Value in field t.Name was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Name))); err != nil { - return err - } - if _, err := cw.WriteString(string(t.Name)); err != nil { - return err - } - - // t.Command (string) (string) - if len("command") > 1000000 { - return xerrors.Errorf("Value in field \"command\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("command"))); err != nil { - return err - } - if _, err := cw.WriteString(string("command")); err != nil { - return err - } - - if len(t.Command) > 1000000 { - return xerrors.Errorf("Value in field t.Command was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Command))); err != nil { - return err - } - if _, err := cw.WriteString(string(t.Command)); err != nil { - return err - } - - // t.Environment ([]*tangled.Pipeline_Pair) (slice) - if t.Environment != nil { - - if len("environment") > 1000000 { - return xerrors.Errorf("Value in field \"environment\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("environment"))); err != nil { - return err - } - if _, err := cw.WriteString(string("environment")); err != nil { - return err - } - - if len(t.Environment) > 8192 { - return xerrors.Errorf("Slice value in field t.Environment was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Environment))); err != nil { - return err - } - for _, v := range t.Environment { - if err := v.MarshalCBOR(cw); err != nil { - return err - } - - } - } - return nil -} - -func (t *Pipeline_Step) UnmarshalCBOR(r io.Reader) (err error) { - *t = Pipeline_Step{} - - cr := cbg.NewCborReader(r) - - maj, extra, err := cr.ReadHeader() - if err != nil { - return err - } - defer func() { - if err == io.EOF { - err = io.ErrUnexpectedEOF - } - }() - - if maj != cbg.MajMap { - return fmt.Errorf("cbor input should be of type map") - } - - if extra > cbg.MaxLength { - return fmt.Errorf("Pipeline_Step: map struct too large (%d)", extra) - } - - n := extra - - nameBuf := make([]byte, 11) - for i := uint64(0); i < n; i++ { - nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) - if err != nil { - return err - } - - if !ok { - // Field doesn't exist on this type, so ignore it - if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { - return err - } - continue - } - - switch string(nameBuf[:nameLen]) { - // t.Name (string) (string) - case "name": - - { - sval, err := cbg.ReadStringWithMax(cr, 1000000) - if err != nil { - return err - } - - t.Name = string(sval) - } - // t.Command (string) (string) - case "command": - - { - sval, err := cbg.ReadStringWithMax(cr, 1000000) - if err != nil { - return err - } - - t.Command = string(sval) - } - // t.Environment ([]*tangled.Pipeline_Pair) (slice) - case "environment": - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - - if extra > 8192 { - return fmt.Errorf("t.Environment: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.Environment = make([]*Pipeline_Pair, extra) - } - - for i := 0; i < int(extra); i++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - b, err := cr.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := cr.UnreadByte(); err != nil { - return err - } - t.Environment[i] = new(Pipeline_Pair) - if err := t.Environment[i].UnmarshalCBOR(cr); err != nil { - return xerrors.Errorf("unmarshaling t.Environment[i] pointer: %w", err) - } - } - - } - - } - } - - default: - // Field doesn't exist on this type, so ignore it - if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { - return err - } - } - } - - return nil -} func (t *Pipeline_TriggerMetadata) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) @@ -4609,7 +4219,30 @@ func (t *Pipeline_Workflow) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write([]byte{165}); err != nil { + if _, err := cw.Write([]byte{164}); err != nil { + return err + } + + // t.Raw (string) (string) + if len("raw") > 1000000 { + return xerrors.Errorf("Value in field \"raw\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("raw"))); err != nil { + return err + } + if _, err := cw.WriteString(string("raw")); err != nil { + return err + } + + if len(t.Raw) > 1000000 { + return xerrors.Errorf("Value in field t.Raw was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Raw))); err != nil { + return err + } + if _, err := cw.WriteString(string(t.Raw)); err != nil { return err } @@ -4652,83 +4285,28 @@ func (t *Pipeline_Workflow) MarshalCBOR(w io.Writer) error { return err } - // t.Steps ([]*tangled.Pipeline_Step) (slice) - if len("steps") > 1000000 { - return xerrors.Errorf("Value in field \"steps\" was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("steps"))); err != nil { - return err - } - if _, err := cw.WriteString(string("steps")); err != nil { - return err - } - - if len(t.Steps) > 8192 { - return xerrors.Errorf("Slice value in field t.Steps was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Steps))); err != nil { - return err - } - for _, v := range t.Steps { - if err := v.MarshalCBOR(cw); err != nil { - return err - } - - } - - // t.Environment ([]*tangled.Pipeline_Pair) (slice) - if len("environment") > 1000000 { - return xerrors.Errorf("Value in field \"environment\" was too long") + // t.Engine (string) (string) + if len("engine") > 1000000 { + return xerrors.Errorf("Value in field \"engine\" was too long") } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("environment"))); err != nil { - return err - } - if _, err := cw.WriteString(string("environment")); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("engine"))); err != nil { return err } - - if len(t.Environment) > 8192 { - return xerrors.Errorf("Slice value in field t.Environment was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Environment))); err != nil { + if _, err := cw.WriteString(string("engine")); err != nil { return err } - for _, v := range t.Environment { - if err := v.MarshalCBOR(cw); err != nil { - return err - } - - } - // t.Dependencies ([]*tangled.Pipeline_Dependency) (slice) - if len("dependencies") > 1000000 { - return xerrors.Errorf("Value in field \"dependencies\" was too long") + if len(t.Engine) > 1000000 { + return xerrors.Errorf("Value in field t.Engine was too long") } - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("dependencies"))); err != nil { + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Engine))); err != nil { return err } - if _, err := cw.WriteString(string("dependencies")); err != nil { - return err - } - - if len(t.Dependencies) > 8192 { - return xerrors.Errorf("Slice value in field t.Dependencies was too long") - } - - if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Dependencies))); err != nil { + if _, err := cw.WriteString(string(t.Engine)); err != nil { return err } - for _, v := range t.Dependencies { - if err := v.MarshalCBOR(cw); err != nil { - return err - } - - } return nil } @@ -4757,7 +4335,7 @@ func (t *Pipeline_Workflow) UnmarshalCBOR(r io.Reader) (err error) { n := extra - nameBuf := make([]byte, 12) + nameBuf := make([]byte, 6) for i := uint64(0); i < n; i++ { nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000) if err != nil { @@ -4773,7 +4351,18 @@ func (t *Pipeline_Workflow) UnmarshalCBOR(r io.Reader) (err error) { } switch string(nameBuf[:nameLen]) { - // t.Name (string) (string) + // t.Raw (string) (string) + case "raw": + + { + sval, err := cbg.ReadStringWithMax(cr, 1000000) + if err != nil { + return err + } + + t.Raw = string(sval) + } + // t.Name (string) (string) case "name": { @@ -4804,152 +4393,16 @@ func (t *Pipeline_Workflow) UnmarshalCBOR(r io.Reader) (err error) { } } - // t.Steps ([]*tangled.Pipeline_Step) (slice) - case "steps": - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - - if extra > 8192 { - return fmt.Errorf("t.Steps: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.Steps = make([]*Pipeline_Step, extra) - } - - for i := 0; i < int(extra); i++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - b, err := cr.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := cr.UnreadByte(); err != nil { - return err - } - t.Steps[i] = new(Pipeline_Step) - if err := t.Steps[i].UnmarshalCBOR(cr); err != nil { - return xerrors.Errorf("unmarshaling t.Steps[i] pointer: %w", err) - } - } - - } - - } - } - // t.Environment ([]*tangled.Pipeline_Pair) (slice) - case "environment": - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - - if extra > 8192 { - return fmt.Errorf("t.Environment: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.Environment = make([]*Pipeline_Pair, extra) - } - - for i := 0; i < int(extra); i++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - b, err := cr.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := cr.UnreadByte(); err != nil { - return err - } - t.Environment[i] = new(Pipeline_Pair) - if err := t.Environment[i].UnmarshalCBOR(cr); err != nil { - return xerrors.Errorf("unmarshaling t.Environment[i] pointer: %w", err) - } - } - - } + // t.Engine (string) (string) + case "engine": + { + sval, err := cbg.ReadStringWithMax(cr, 1000000) + if err != nil { + return err } - } - // t.Dependencies ([]*tangled.Pipeline_Dependency) (slice) - case "dependencies": - - maj, extra, err = cr.ReadHeader() - if err != nil { - return err - } - if extra > 8192 { - return fmt.Errorf("t.Dependencies: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.Dependencies = make([]*Pipeline_Dependency, extra) - } - - for i := 0; i < int(extra); i++ { - { - var maj byte - var extra uint64 - var err error - _ = maj - _ = extra - _ = err - - { - - b, err := cr.ReadByte() - if err != nil { - return err - } - if b != cbg.CborNull[0] { - if err := cr.UnreadByte(); err != nil { - return err - } - t.Dependencies[i] = new(Pipeline_Dependency) - if err := t.Dependencies[i].UnmarshalCBOR(cr); err != nil { - return xerrors.Errorf("unmarshaling t.Dependencies[i] pointer: %w", err) - } - } - - } - - } + t.Engine = string(sval) } default: diff --git a/api/tangled/tangledpipeline.go b/api/tangled/tangledpipeline.go index ae625c4..9de7d45 100644 --- a/api/tangled/tangledpipeline.go +++ b/api/tangled/tangledpipeline.go @@ -29,12 +29,6 @@ type Pipeline_CloneOpts struct { Submodules bool `json:"submodules" cborgen:"submodules"` } -// Pipeline_Dependency is a "dependency" in the sh.tangled.pipeline schema. -type Pipeline_Dependency struct { - Packages []string `json:"packages" cborgen:"packages"` - Registry string `json:"registry" cborgen:"registry"` -} - // Pipeline_ManualTriggerData is a "manualTriggerData" in the sh.tangled.pipeline schema. type Pipeline_ManualTriggerData struct { Inputs []*Pipeline_Pair `json:"inputs,omitempty" cborgen:"inputs,omitempty"` @@ -61,13 +55,6 @@ type Pipeline_PushTriggerData struct { Ref string `json:"ref" cborgen:"ref"` } -// Pipeline_Step is a "step" in the sh.tangled.pipeline schema. -type Pipeline_Step struct { - Command string `json:"command" cborgen:"command"` - Environment []*Pipeline_Pair `json:"environment,omitempty" cborgen:"environment,omitempty"` - Name string `json:"name" cborgen:"name"` -} - // Pipeline_TriggerMetadata is a "triggerMetadata" in the sh.tangled.pipeline schema. type Pipeline_TriggerMetadata struct { Kind string `json:"kind" cborgen:"kind"` @@ -87,9 +74,8 @@ type Pipeline_TriggerRepo struct { // Pipeline_Workflow is a "workflow" in the sh.tangled.pipeline schema. type Pipeline_Workflow struct { - Clone *Pipeline_CloneOpts `json:"clone" cborgen:"clone"` - Dependencies []*Pipeline_Dependency `json:"dependencies" cborgen:"dependencies"` - Environment []*Pipeline_Pair `json:"environment" cborgen:"environment"` - Name string `json:"name" cborgen:"name"` - Steps []*Pipeline_Step `json:"steps" cborgen:"steps"` + Clone *Pipeline_CloneOpts `json:"clone" cborgen:"clone"` + Engine string `json:"engine" cborgen:"engine"` + Name string `json:"name" cborgen:"name"` + Raw string `json:"raw" cborgen:"raw"` } diff --git a/cmd/gen.go b/cmd/gen.go index 2e07c0e..dfda579 100644 --- a/cmd/gen.go +++ b/cmd/gen.go @@ -27,13 +27,11 @@ func main() { tangled.KnotMember{}, tangled.Pipeline{}, tangled.Pipeline_CloneOpts{}, - tangled.Pipeline_Dependency{}, tangled.Pipeline_ManualTriggerData{}, tangled.Pipeline_Pair{}, tangled.Pipeline_PullRequestTriggerData{}, tangled.Pipeline_PushTriggerData{}, tangled.PipelineStatus{}, - tangled.Pipeline_Step{}, tangled.Pipeline_TriggerMetadata{}, tangled.Pipeline_TriggerRepo{}, tangled.Pipeline_Workflow{}, diff --git a/lexicons/pipeline/pipeline.json b/lexicons/pipeline/pipeline.json index d9e9872..9c1138b 100644 --- a/lexicons/pipeline/pipeline.json +++ b/lexicons/pipeline/pipeline.json @@ -149,57 +149,23 @@ "type": "object", "required": [ "name", - "dependencies", - "steps", - "environment", - "clone" + "engine", + "clone", + "raw" ], "properties": { "name": { "type": "string" }, - "dependencies": { - "type": "array", - "items": { - "type": "ref", - "ref": "#dependency" - } - }, - "steps": { - "type": "array", - "items": { - "type": "ref", - "ref": "#step" - } - }, - "environment": { - "type": "array", - "items": { - "type": "ref", - "ref": "#pair" - } + "engine": { + "type": "string" }, "clone": { "type": "ref", "ref": "#cloneOpts" - } - } - }, - "dependency": { - "type": "object", - "required": [ - "registry", - "packages" - ], - "properties": { - "registry": { - "type": "string" }, - "packages": { - "type": "array", - "items": { - "type": "string" - } + "raw": { + "type": "string" } } }, @@ -222,28 +188,6 @@ } } }, - "step": { - "type": "object", - "required": [ - "name", - "command" - ], - "properties": { - "name": { - "type": "string" - }, - "command": { - "type": "string" - }, - "environment": { - "type": "array", - "items": { - "type": "ref", - "ref": "#pair" - } - } - } - }, "pair": { "type": "object", "required": [ diff --git a/nix/modules/spindle.nix b/nix/modules/spindle.nix index dbde1eb..35e6a6b 100644 --- a/nix/modules/spindle.nix +++ b/nix/modules/spindle.nix @@ -111,8 +111,8 @@ in "SPINDLE_SERVER_SECRETS_PROVIDER=${cfg.server.secrets.provider}" "SPINDLE_SERVER_SECRETS_OPENBAO_PROXY_ADDR=${cfg.server.secrets.openbao.proxyAddr}" "SPINDLE_SERVER_SECRETS_OPENBAO_MOUNT=${cfg.server.secrets.openbao.mount}" - "SPINDLE_PIPELINES_NIXERY=${cfg.pipelines.nixery}" - "SPINDLE_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" + "SPINDLE_NIXERY_PIPELINES_NIXERY=${cfg.pipelines.nixery}" + "SPINDLE_NIXERY_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" ]; ExecStart = "${cfg.package}/bin/spindle"; Restart = "always"; diff --git a/spindle/config/config.go b/spindle/config/config.go index c4bdafd..5719c5c 100644 --- a/spindle/config/config.go +++ b/spindle/config/config.go @@ -16,6 +16,7 @@ type Server struct { Dev bool `env:"DEV, default=false"` Owner string `env:"OWNER, required"` Secrets Secrets `env:",prefix=SECRETS_"` + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` } func (s Server) Did() syntax.DID { @@ -32,15 +33,14 @@ type OpenBaoConfig struct { Mount string `env:"MOUNT, default=spindle"` } -type Pipelines struct { +type NixeryPipelines struct { Nixery string `env:"NIXERY, default=nixery.tangled.sh"` WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"` - LogDir string `env:"LOG_DIR, default=/var/log/spindle"` } type Config struct { - Server Server `env:",prefix=SPINDLE_SERVER_"` - Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"` + Server Server `env:",prefix=SPINDLE_SERVER_"` + NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"` } func Load(ctx context.Context) (*Config, error) { diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index 65e32eb..683fc58 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -4,23 +4,10 @@ import ( "context" "errors" "fmt" - "io" "log/slog" - "os" - "strings" - "sync" - "time" securejoin "github.com/cyphar/filepath-securejoin" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/image" - "github.com/docker/docker/api/types/mount" - "github.com/docker/docker/api/types/network" - "github.com/docker/docker/api/types/volume" - "github.com/docker/docker/client" - "github.com/docker/docker/pkg/stdcopy" "golang.org/x/sync/errgroup" - "tangled.sh/tangled.sh/core/log" "tangled.sh/tangled.sh/core/notifier" "tangled.sh/tangled.sh/core/spindle/config" "tangled.sh/tangled.sh/core/spindle/db" @@ -28,440 +15,101 @@ import ( "tangled.sh/tangled.sh/core/spindle/secrets" ) -const ( - workspaceDir = "/tangled/workspace" +var ( + ErrTimedOut = errors.New("timed out") + ErrWorkflowFailed = errors.New("workflow failed") ) -type cleanupFunc func(context.Context) error - -type Engine struct { - docker client.APIClient - l *slog.Logger - db *db.DB - n *notifier.Notifier - cfg *config.Config - vault secrets.Manager - - cleanupMu sync.Mutex - cleanup map[string][]cleanupFunc -} - -func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) { - dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, err - } - - l := log.FromContext(ctx).With("component", "spindle") - - e := &Engine{ - docker: dcli, - l: l, - db: db, - n: n, - cfg: cfg, - vault: vault, - } - - e.cleanup = make(map[string][]cleanupFunc) - - return e, nil -} - -func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { - e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) +func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { + l.Info("starting all workflows in parallel", "pipeline", pipelineId) // extract secrets var allSecrets []secrets.UnlockedSecret if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { - if res, err := e.vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { + if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { allSecrets = res } } - workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout - workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) - if err != nil { - e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) - workflowTimeout = 5 * time.Minute - } - e.l.Info("using workflow timeout", "timeout", workflowTimeout) - eg, ctx := errgroup.WithContext(ctx) - for _, w := range pipeline.Workflows { - eg.Go(func() error { - wid := models.WorkflowId{ - PipelineId: pipelineId, - Name: w.Name, - } - - err := e.db.StatusRunning(wid, e.n) - if err != nil { - return err - } - - err = e.SetupWorkflow(ctx, wid) - if err != nil { - e.l.Error("setting up worklow", "wid", wid, "err", err) - return err - } - defer e.DestroyWorkflow(ctx, wid) - - reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) - if err != nil { - e.l.Error("pipeline image pull failed!", "image", w.Image, "workflowId", wid, "error", err.Error()) + for eng, wfs := range pipeline.Workflows { + workflowTimeout := eng.WorkflowTimeout() + l.Info("using workflow timeout", "timeout", workflowTimeout) + + for _, w := range wfs { + eg.Go(func() error { + wid := models.WorkflowId{ + PipelineId: pipelineId, + Name: w.Name, + } - err := e.db.StatusFailed(wid, err.Error(), -1, e.n) + err := db.StatusRunning(wid, n) if err != nil { return err } - return fmt.Errorf("pulling image: %w", err) - } - defer reader.Close() - io.Copy(os.Stdout, reader) - - ctx, cancel := context.WithTimeout(ctx, workflowTimeout) - defer cancel() + err = eng.SetupWorkflow(ctx, wid, &w) + if err != nil { + // TODO(winter): Should this always set StatusFailed? + // In the original, we only do in a subset of cases. + l.Error("setting up worklow", "wid", wid, "err", err) - err = e.StartSteps(ctx, wid, w, allSecrets) - if err != nil { - if errors.Is(err, ErrTimedOut) { - dbErr := e.db.StatusTimeout(wid, e.n) - if dbErr != nil { - return dbErr - } - } else { - dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) + dbErr := db.StatusFailed(wid, err.Error(), -1, n) if dbErr != nil { return dbErr } + return err } + defer eng.DestroyWorkflow(ctx, wid) - return fmt.Errorf("starting steps image: %w", err) - } - - err = e.db.StatusSuccess(wid, e.n) - if err != nil { - return err - } - - return nil - }) - } - - if err = eg.Wait(); err != nil { - e.l.Error("failed to run one or more workflows", "err", err) - } else { - e.l.Error("successfully ran full pipeline") - } -} - -// SetupWorkflow sets up a new network for the workflow and volumes for -// the workspace and Nix store. These are persisted across steps and are -// destroyed at the end of the workflow. -func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { - e.l.Info("setting up workflow", "workflow", wid) - - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: workspaceVolume(wid), - Driver: "local", - }) - if err != nil { - return err - } - e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) - }) - - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: nixVolume(wid), - Driver: "local", - }) - if err != nil { - return err - } - e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) - }) - - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ - Driver: "bridge", - }) - if err != nil { - return err - } - e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.NetworkRemove(ctx, networkName(wid)) - }) - - return nil -} - -// StartSteps starts all steps sequentially with the same base image. -// ONLY marks pipeline as failed if container's exit code is non-zero. -// All other errors are bubbled up. -// Fixed version of the step execution logic -func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error { - workflowEnvs := ConstructEnvs(w.Environment) - for _, s := range secrets { - workflowEnvs.AddEnv(s.Key, s.Value) - } - - for stepIdx, step := range w.Steps { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - envs := append(EnvVars(nil), workflowEnvs...) - for k, v := range step.Environment { - envs.AddEnv(k, v) - } - envs.AddEnv("HOME", workspaceDir) - e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) - - hostConfig := hostConfig(wid) - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ - Image: w.Image, - Cmd: []string{"bash", "-c", step.Command}, - WorkingDir: workspaceDir, - Tty: false, - Hostname: "spindle", - Env: envs.Slice(), - }, hostConfig, nil, nil, "") - defer e.DestroyStep(ctx, resp.ID) - if err != nil { - return fmt.Errorf("creating container: %w", err) - } - - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) - if err != nil { - return fmt.Errorf("connecting network: %w", err) - } - - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) - if err != nil { - return err - } - e.l.Info("started container", "name", resp.ID, "step", step.Name) - - // start tailing logs in background - tailDone := make(chan error, 1) - go func() { - tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step) - }() - - // wait for container completion or timeout - waitDone := make(chan struct{}) - var state *container.State - var waitErr error - - go func() { - defer close(waitDone) - state, waitErr = e.WaitStep(ctx, resp.ID) - }() - - select { - case <-waitDone: - - // wait for tailing to complete - <-tailDone - - case <-ctx.Done(): - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) - err = e.DestroyStep(context.Background(), resp.ID) - if err != nil { - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) - } - - // wait for both goroutines to finish - <-waitDone - <-tailDone - - return ErrTimedOut - } - - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if waitErr != nil { - return waitErr - } - - err = e.DestroyStep(ctx, resp.ID) - if err != nil { - return err - } - - if state.ExitCode != 0 { - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) - if state.OOMKilled { - return ErrOOMKilled - } - return ErrWorkflowFailed - } - } - - return nil -} - -func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) - select { - case err := <-errCh: - if err != nil { - return nil, err - } - case <-wait: - } - - e.l.Info("waited for container", "name", containerID) - - info, err := e.docker.ContainerInspect(ctx, containerID) - if err != nil { - return nil, err - } - - return info.State, nil -} - -func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { - wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) - if err != nil { - e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) - return err - } - defer wfLogger.Close() - - ctl := wfLogger.ControlWriter(stepIdx, step) - ctl.Write([]byte(step.Name)) - - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ - Follow: true, - ShowStdout: true, - ShowStderr: true, - Details: false, - Timestamps: false, - }) - if err != nil { - return err - } - - _, err = stdcopy.StdCopy( - wfLogger.DataWriter("stdout"), - wfLogger.DataWriter("stderr"), - logs, - ) - if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("failed to copy logs: %w", err) - } - - return nil -} - -func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err - } + wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid) + if err != nil { + l.Warn("failed to setup step logger; logs will not be persisted", "error", err) + wfLogger = nil + } else { + defer wfLogger.Close() + } - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ - RemoveVolumes: true, - RemoveLinks: false, - Force: false, - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err - } + ctx, cancel := context.WithTimeout(ctx, workflowTimeout) + defer cancel() - return nil -} + for stepIdx, step := range w.Steps { + if wfLogger != nil { + ctl := wfLogger.ControlWriter(stepIdx, step) + ctl.Write([]byte(step.Name())) + } -func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { - e.cleanupMu.Lock() - key := wid.String() + err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) + if err != nil { + if errors.Is(err, ErrTimedOut) { + dbErr := db.StatusTimeout(wid, n) + if dbErr != nil { + return dbErr + } + } else { + dbErr := db.StatusFailed(wid, err.Error(), -1, n) + if dbErr != nil { + return dbErr + } + } + + return fmt.Errorf("starting steps image: %w", err) + } + } - fns := e.cleanup[key] - delete(e.cleanup, key) - e.cleanupMu.Unlock() + err = db.StatusSuccess(wid, n) + if err != nil { + return err + } - for _, fn := range fns { - if err := fn(ctx); err != nil { - e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) + return nil + }) } } - return nil -} - -func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { - e.cleanupMu.Lock() - defer e.cleanupMu.Unlock() - - key := wid.String() - e.cleanup[key] = append(e.cleanup[key], fn) -} - -func workspaceVolume(wid models.WorkflowId) string { - return fmt.Sprintf("workspace-%s", wid) -} - -func nixVolume(wid models.WorkflowId) string { - return fmt.Sprintf("nix-%s", wid) -} - -func networkName(wid models.WorkflowId) string { - return fmt.Sprintf("workflow-network-%s", wid) -} -func hostConfig(wid models.WorkflowId) *container.HostConfig { - hostConfig := &container.HostConfig{ - Mounts: []mount.Mount{ - { - Type: mount.TypeVolume, - Source: workspaceVolume(wid), - Target: workspaceDir, - }, - { - Type: mount.TypeVolume, - Source: nixVolume(wid), - Target: "/nix", - }, - { - Type: mount.TypeTmpfs, - Target: "/tmp", - ReadOnly: false, - TmpfsOptions: &mount.TmpfsOptions{ - Mode: 0o1777, // world-writeable sticky bit - Options: [][]string{ - {"exec"}, - }, - }, - }, - { - Type: mount.TypeVolume, - Source: "etc-nix-" + wid.String(), - Target: "/etc/nix", - }, - }, - ReadonlyRootfs: false, - CapDrop: []string{"ALL"}, - CapAdd: []string{"CAP_DAC_OVERRIDE"}, - SecurityOpt: []string{"no-new-privileges"}, - ExtraHosts: []string{"host.docker.internal:host-gateway"}, + if err := eg.Wait(); err != nil { + l.Error("failed to run one or more workflows", "err", err) + } else { + l.Error("successfully ran full pipeline") } - - return hostConfig -} - -// thanks woodpecker -func isErrContainerNotFoundOrNotRunning(err error) bool { - // Error response from daemon: Cannot kill container: ...: No such container: ... - // Error response from daemon: Cannot kill container: ...: Container ... is not running" - // Error response from podman daemon: can only kill running containers. ... is in state exited - // Error: No such container: ... - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) } diff --git a/spindle/engine/errors.go b/spindle/engine/errors.go deleted file mode 100644 index 47b052a..0000000 --- a/spindle/engine/errors.go +++ /dev/null @@ -1,9 +0,0 @@ -package engine - -import "errors" - -var ( - ErrOOMKilled = errors.New("oom killed") - ErrTimedOut = errors.New("timed out") - ErrWorkflowFailed = errors.New("workflow failed") -) diff --git a/spindle/engine/ansi_stripper.go b/spindle/engines/nixery/ansi_stripper.go similarity index 96% rename from spindle/engine/ansi_stripper.go rename to spindle/engines/nixery/ansi_stripper.go index f4ef864..db64ee6 100644 --- a/spindle/engine/ansi_stripper.go +++ b/spindle/engines/nixery/ansi_stripper.go @@ -1,4 +1,4 @@ -package engine +package nixery import ( "io" diff --git a/spindle/engines/nixery/engine.go b/spindle/engines/nixery/engine.go new file mode 100644 index 0000000..fe7017a --- /dev/null +++ b/spindle/engines/nixery/engine.go @@ -0,0 +1,476 @@ +package nixery + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/mount" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" + "gopkg.in/yaml.v3" + "tangled.sh/tangled.sh/core/api/tangled" + "tangled.sh/tangled.sh/core/log" + "tangled.sh/tangled.sh/core/spindle/config" + "tangled.sh/tangled.sh/core/spindle/engine" + "tangled.sh/tangled.sh/core/spindle/models" + "tangled.sh/tangled.sh/core/spindle/secrets" +) + +const ( + workspaceDir = "/tangled/workspace" +) + +type cleanupFunc func(context.Context) error + +type Engine struct { + docker client.APIClient + l *slog.Logger + cfg *config.Config + + cleanupMu sync.Mutex + cleanup map[string][]cleanupFunc +} + +type Step struct { + name string + kind models.StepKind + command string + environment map[string]string +} + +func (s Step) Name() string { + return s.name +} + +func (s Step) Command() string { + return s.command +} + +func (s Step) Kind() models.StepKind { + return s.kind +} + +// setupSteps get added to start of Steps +type setupSteps []models.Step + +// addStep adds a step to the beginning of the workflow's steps. +func (ss *setupSteps) addStep(step models.Step) { + *ss = append(*ss, step) +} + +type addlFields struct { + image string + env map[string]string +} + +func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { + swf := &models.Workflow{} + addl := addlFields{} + + dwf := &struct { + Steps []struct { + Command string `yaml:"command"` + Name string `yaml:"name"` + Environment map[string]string `yaml:"environment"` + } `yaml:"steps"` + Dependencies map[string][]string `yaml:"dependencies"` + Environment map[string]string `yaml:"environment"` + }{} + err := yaml.Unmarshal([]byte(twf.Raw), &dwf) + if err != nil { + return nil, err + } + + for _, dstep := range dwf.Steps { + sstep := Step{} + sstep.environment = dstep.Environment + sstep.command = dstep.Command + sstep.name = dstep.Name + sstep.kind = models.StepKindUser + swf.Steps = append(swf.Steps, sstep) + } + swf.Name = twf.Name + addl.env = dwf.Environment + addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) + + setup := &setupSteps{} + + setup.addStep(nixConfStep()) + setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) + // this step could be empty + if s := dependencyStep(dwf.Dependencies); s != nil { + setup.addStep(*s) + } + + // append setup steps in order to the start of workflow steps + swf.Steps = append(*setup, swf.Steps...) + swf.Data = addl + + return swf, nil +} + +func (e *Engine) WorkflowTimeout() time.Duration { + workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout + workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) + if err != nil { + e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) + workflowTimeout = 5 * time.Minute + } + + return workflowTimeout +} + +func workflowImage(deps map[string][]string, nixery string) string { + var dependencies string + for reg, ds := range deps { + if reg == "nixpkgs" { + dependencies = path.Join(ds...) + } + } + + // load defaults from somewhere else + dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") + + return path.Join(nixery, dependencies) +} + +func New(ctx context.Context, cfg *config.Config) (*Engine, error) { + dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + return nil, err + } + + l := log.FromContext(ctx).With("component", "spindle") + + e := &Engine{ + docker: dcli, + l: l, + cfg: cfg, + } + + e.cleanup = make(map[string][]cleanupFunc) + + return e, nil +} + +// SetupWorkflow sets up a new network for the workflow and volumes for +// the workspace and Nix store. These are persisted across steps and are +// destroyed at the end of the workflow. +func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { + e.l.Info("setting up workflow", "workflow", wid) + + _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ + Name: workspaceVolume(wid), + Driver: "local", + }) + if err != nil { + return err + } + e.registerCleanup(wid, func(ctx context.Context) error { + return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) + }) + + _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ + Name: nixVolume(wid), + Driver: "local", + }) + if err != nil { + return err + } + e.registerCleanup(wid, func(ctx context.Context) error { + return e.docker.VolumeRemove(ctx, nixVolume(wid), true) + }) + + _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ + Driver: "bridge", + }) + if err != nil { + return err + } + e.registerCleanup(wid, func(ctx context.Context) error { + return e.docker.NetworkRemove(ctx, networkName(wid)) + }) + + addl := wf.Data.(addlFields) + + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) + if err != nil { + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) + + return fmt.Errorf("pulling image: %w", err) + } + defer reader.Close() + io.Copy(os.Stdout, reader) + + return nil +} + +func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { + workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) + for _, s := range secrets { + workflowEnvs.AddEnv(s.Key, s.Value) + } + + step := w.Steps[idx].(Step) + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + envs := append(EnvVars(nil), workflowEnvs...) + for k, v := range step.environment { + envs.AddEnv(k, v) + } + envs.AddEnv("HOME", workspaceDir) + e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) + + hostConfig := hostConfig(wid) + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ + Image: w.Data.(addlFields).image, + Cmd: []string{"bash", "-c", step.command}, + WorkingDir: workspaceDir, + Tty: false, + Hostname: "spindle", + Env: envs.Slice(), + }, hostConfig, nil, nil, "") + defer e.DestroyStep(ctx, resp.ID) + if err != nil { + return fmt.Errorf("creating container: %w", err) + } + + err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) + if err != nil { + return fmt.Errorf("connecting network: %w", err) + } + + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) + if err != nil { + return err + } + e.l.Info("started container", "name", resp.ID, "step", step.Name) + + // start tailing logs in background + tailDone := make(chan error, 1) + go func() { + tailDone <- e.TailStep(ctx, wfLogger, resp.ID, wid, idx, step) + }() + + // wait for container completion or timeout + waitDone := make(chan struct{}) + var state *container.State + var waitErr error + + go func() { + defer close(waitDone) + state, waitErr = e.WaitStep(ctx, resp.ID) + }() + + select { + case <-waitDone: + + // wait for tailing to complete + <-tailDone + + case <-ctx.Done(): + e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) + err = e.DestroyStep(context.Background(), resp.ID) + if err != nil { + e.l.Error("failed to destroy step", "container", resp.ID, "error", err) + } + + // wait for both goroutines to finish + <-waitDone + <-tailDone + + return engine.ErrTimedOut + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if waitErr != nil { + return waitErr + } + + err = e.DestroyStep(ctx, resp.ID) + if err != nil { + return err + } + + if state.ExitCode != 0 { + e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) + if state.OOMKilled { + return ErrOOMKilled + } + return engine.ErrWorkflowFailed + } + + return nil +} + +func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { + wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + select { + case err := <-errCh: + if err != nil { + return nil, err + } + case <-wait: + } + + e.l.Info("waited for container", "name", containerID) + + info, err := e.docker.ContainerInspect(ctx, containerID) + if err != nil { + return nil, err + } + + return info.State, nil +} + +func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { + if wfLogger == nil { + return nil + } + + logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ + Follow: true, + ShowStdout: true, + ShowStderr: true, + Details: false, + Timestamps: false, + }) + if err != nil { + return err + } + + _, err = stdcopy.StdCopy( + wfLogger.DataWriter("stdout"), + wfLogger.DataWriter("stderr"), + logs, + ) + if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("failed to copy logs: %w", err) + } + + return nil +} + +func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { + err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL + if err != nil && !isErrContainerNotFoundOrNotRunning(err) { + return err + } + + if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ + RemoveVolumes: true, + RemoveLinks: false, + Force: false, + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { + return err + } + + return nil +} + +func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { + e.cleanupMu.Lock() + key := wid.String() + + fns := e.cleanup[key] + delete(e.cleanup, key) + e.cleanupMu.Unlock() + + for _, fn := range fns { + if err := fn(ctx); err != nil { + e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) + } + } + return nil +} + +func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { + e.cleanupMu.Lock() + defer e.cleanupMu.Unlock() + + key := wid.String() + e.cleanup[key] = append(e.cleanup[key], fn) +} + +func workspaceVolume(wid models.WorkflowId) string { + return fmt.Sprintf("workspace-%s", wid) +} + +func nixVolume(wid models.WorkflowId) string { + return fmt.Sprintf("nix-%s", wid) +} + +func networkName(wid models.WorkflowId) string { + return fmt.Sprintf("workflow-network-%s", wid) +} + +func hostConfig(wid models.WorkflowId) *container.HostConfig { + hostConfig := &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeVolume, + Source: workspaceVolume(wid), + Target: workspaceDir, + }, + { + Type: mount.TypeVolume, + Source: nixVolume(wid), + Target: "/nix", + }, + { + Type: mount.TypeTmpfs, + Target: "/tmp", + ReadOnly: false, + TmpfsOptions: &mount.TmpfsOptions{ + Mode: 0o1777, // world-writeable sticky bit + Options: [][]string{ + {"exec"}, + }, + }, + }, + { + Type: mount.TypeVolume, + Source: "etc-nix-" + wid.String(), + Target: "/etc/nix", + }, + }, + ReadonlyRootfs: false, + CapDrop: []string{"ALL"}, + CapAdd: []string{"CAP_DAC_OVERRIDE"}, + SecurityOpt: []string{"no-new-privileges"}, + ExtraHosts: []string{"host.docker.internal:host-gateway"}, + } + + return hostConfig +} + +// thanks woodpecker +func isErrContainerNotFoundOrNotRunning(err error) bool { + // Error response from daemon: Cannot kill container: ...: No such container: ... + // Error response from daemon: Cannot kill container: ...: Container ... is not running" + // Error response from podman daemon: can only kill running containers. ... is in state exited + // Error: No such container: ... + return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) +} diff --git a/spindle/engine/envs.go b/spindle/engines/nixery/envs.go similarity index 97% rename from spindle/engine/envs.go rename to spindle/engines/nixery/envs.go index 440a9a6..9aafc34 100644 --- a/spindle/engine/envs.go +++ b/spindle/engines/nixery/envs.go @@ -1,4 +1,4 @@ -package engine +package nixery import ( "fmt" diff --git a/spindle/engine/envs_test.go b/spindle/engines/nixery/envs_test.go similarity index 98% rename from spindle/engine/envs_test.go rename to spindle/engines/nixery/envs_test.go index ee7045f..9c2c961 100644 --- a/spindle/engine/envs_test.go +++ b/spindle/engines/nixery/envs_test.go @@ -1,4 +1,4 @@ -package engine +package nixery import ( "testing" diff --git a/spindle/engines/nixery/errors.go b/spindle/engines/nixery/errors.go new file mode 100644 index 0000000..0a827e7 --- /dev/null +++ b/spindle/engines/nixery/errors.go @@ -0,0 +1,7 @@ +package nixery + +import "errors" + +var ( + ErrOOMKilled = errors.New("oom killed") +) diff --git a/spindle/models/setup_steps.go b/spindle/engines/nixery/setup_steps.go similarity index 88% rename from spindle/models/setup_steps.go rename to spindle/engines/nixery/setup_steps.go index 3aaff99..a502b5f 100644 --- a/spindle/models/setup_steps.go +++ b/spindle/engines/nixery/setup_steps.go @@ -1,4 +1,4 @@ -package models +package nixery import ( "fmt" @@ -13,8 +13,8 @@ func nixConfStep() Step { setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf echo 'build-users-group = ' >> /etc/nix/nix.conf` return Step{ - Command: setupCmd, - Name: "Configure Nix", + command: setupCmd, + name: "Configure Nix", } } @@ -81,8 +81,8 @@ func cloneStep(twf tangled.Pipeline_Workflow, tr tangled.Pipeline_TriggerMetadat commands = append(commands, "git checkout FETCH_HEAD") cloneStep := Step{ - Command: strings.Join(commands, "\n"), - Name: "Clone repository into workspace", + command: strings.Join(commands, "\n"), + name: "Clone repository into workspace", } return cloneStep } @@ -91,13 +91,10 @@ func cloneStep(twf tangled.Pipeline_Workflow, tr tangled.Pipeline_TriggerMetadat // For dependencies using a custom registry (i.e. not nixpkgs), it collects // all packages and adds a single 'nix profile install' step to the // beginning of the workflow's step list. -func dependencyStep(twf tangled.Pipeline_Workflow) *Step { +func dependencyStep(deps map[string][]string) *Step { var customPackages []string - for _, d := range twf.Dependencies { - registry := d.Registry - packages := d.Packages - + for registry, packages := range deps { if registry == "nixpkgs" { continue } @@ -115,9 +112,9 @@ func dependencyStep(twf tangled.Pipeline_Workflow) *Step { installCmd := "nix --extra-experimental-features nix-command --extra-experimental-features flakes profile install" cmd := fmt.Sprintf("%s %s", installCmd, strings.Join(customPackages, " ")) installStep := Step{ - Command: cmd, - Name: "Install custom dependencies", - Environment: map[string]string{ + command: cmd, + name: "Install custom dependencies", + environment: map[string]string{ "NIX_NO_COLOR": "1", "NIX_SHOW_DOWNLOAD_PROGRESS": "0", }, diff --git a/spindle/models/engine.go b/spindle/models/engine.go new file mode 100644 index 0000000..8ae5f83 --- /dev/null +++ b/spindle/models/engine.go @@ -0,0 +1,17 @@ +package models + +import ( + "context" + "time" + + "tangled.sh/tangled.sh/core/api/tangled" + "tangled.sh/tangled.sh/core/spindle/secrets" +) + +type Engine interface { + InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*Workflow, error) + SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow) error + WorkflowTimeout() time.Duration + DestroyWorkflow(ctx context.Context, wid WorkflowId) error + RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *WorkflowLogger) error +} diff --git a/spindle/engine/logger.go b/spindle/models/logger.go similarity index 74% rename from spindle/engine/logger.go rename to spindle/models/logger.go index ba7d468..194fb1a 100644 --- a/spindle/engine/logger.go +++ b/spindle/models/logger.go @@ -1,4 +1,4 @@ -package engine +package models import ( "encoding/json" @@ -7,8 +7,6 @@ import ( "os" "path/filepath" "strings" - - "tangled.sh/tangled.sh/core/spindle/models" ) type WorkflowLogger struct { @@ -16,7 +14,7 @@ type WorkflowLogger struct { encoder *json.Encoder } -func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, error) { +func NewWorkflowLogger(baseDir string, wid WorkflowId) (*WorkflowLogger, error) { path := LogFilePath(baseDir, wid) file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) @@ -30,7 +28,7 @@ func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, }, nil } -func LogFilePath(baseDir string, workflowID models.WorkflowId) string { +func LogFilePath(baseDir string, workflowID WorkflowId) string { logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String())) return logFilePath } @@ -47,7 +45,7 @@ func (l *WorkflowLogger) DataWriter(stream string) io.Writer { } } -func (l *WorkflowLogger) ControlWriter(idx int, step models.Step) io.Writer { +func (l *WorkflowLogger) ControlWriter(idx int, step Step) io.Writer { return &controlWriter{ logger: l, idx: idx, @@ -62,7 +60,7 @@ type dataWriter struct { func (w *dataWriter) Write(p []byte) (int, error) { line := strings.TrimRight(string(p), "\r\n") - entry := models.NewDataLogLine(line, w.stream) + entry := NewDataLogLine(line, w.stream) if err := w.logger.encoder.Encode(entry); err != nil { return 0, err } @@ -72,13 +70,13 @@ func (w *dataWriter) Write(p []byte) (int, error) { type controlWriter struct { logger *WorkflowLogger idx int - step models.Step + step Step } func (w *controlWriter) Write(_ []byte) (int, error) { - entry := models.NewControlLogLine(w.idx, w.step) + entry := NewControlLogLine(w.idx, w.step) if err := w.logger.encoder.Encode(entry); err != nil { return 0, err } - return len(w.step.Name), nil + return len(w.step.Name()), nil } diff --git a/spindle/models/models.go b/spindle/models/models.go index 6199788..538f17e 100644 --- a/spindle/models/models.go +++ b/spindle/models/models.go @@ -104,9 +104,9 @@ func NewDataLogLine(content, stream string) LogLine { func NewControlLogLine(idx int, step Step) LogLine { return LogLine{ Kind: LogKindControl, - Content: step.Name, + Content: step.Name(), StepId: idx, - StepKind: step.Kind, - StepCommand: step.Command, + StepKind: step.Kind(), + StepCommand: step.Command(), } } diff --git a/spindle/models/pipeline.go b/spindle/models/pipeline.go index 8561b21..3919d8b 100644 --- a/spindle/models/pipeline.go +++ b/spindle/models/pipeline.go @@ -1,23 +1,15 @@ package models -import ( - "path" - - "tangled.sh/tangled.sh/core/api/tangled" - "tangled.sh/tangled.sh/core/spindle/config" -) - type Pipeline struct { RepoOwner string RepoName string - Workflows []Workflow + Workflows map[Engine][]Workflow } -type Step struct { - Command string - Name string - Environment map[string]string - Kind StepKind +type Step interface { + Name() string + Command() string + Kind() StepKind } type StepKind int @@ -30,94 +22,7 @@ const ( ) type Workflow struct { - Steps []Step - Environment map[string]string - Name string - Image string -} - -// setupSteps get added to start of Steps -type setupSteps []Step - -// addStep adds a step to the beginning of the workflow's steps. -func (ss *setupSteps) addStep(step Step) { - *ss = append(*ss, step) -} - -// ToPipeline converts a tangled.Pipeline into a model.Pipeline. -// In the process, dependencies are resolved: nixpkgs deps -// are constructed atop nixery and set as the Workflow.Image, -// and ones from custom registries -func ToPipeline(pl tangled.Pipeline, cfg config.Config) *Pipeline { - workflows := []Workflow{} - - for _, twf := range pl.Workflows { - swf := &Workflow{} - for _, tstep := range twf.Steps { - sstep := Step{} - sstep.Environment = stepEnvToMap(tstep.Environment) - sstep.Command = tstep.Command - sstep.Name = tstep.Name - sstep.Kind = StepKindUser - swf.Steps = append(swf.Steps, sstep) - } - swf.Name = twf.Name - swf.Environment = workflowEnvToMap(twf.Environment) - swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) - - setup := &setupSteps{} - - setup.addStep(nixConfStep()) - setup.addStep(cloneStep(*twf, *pl.TriggerMetadata, cfg.Server.Dev)) - // this step could be empty - if s := dependencyStep(*twf); s != nil { - setup.addStep(*s) - } - - // append setup steps in order to the start of workflow steps - swf.Steps = append(*setup, swf.Steps...) - - workflows = append(workflows, *swf) - } - repoOwner := pl.TriggerMetadata.Repo.Did - repoName := pl.TriggerMetadata.Repo.Repo - return &Pipeline{ - RepoOwner: repoOwner, - RepoName: repoName, - Workflows: workflows, - } -} - -func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { - envMap := map[string]string{} - for _, env := range envs { - if env != nil { - envMap[env.Key] = env.Value - } - } - return envMap -} - -func stepEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { - envMap := map[string]string{} - for _, env := range envs { - if env != nil { - envMap[env.Key] = env.Value - } - } - return envMap -} - -func workflowImage(deps []*tangled.Pipeline_Dependency, nixery string) string { - var dependencies string - for _, d := range deps { - if d.Registry == "nixpkgs" { - dependencies = path.Join(d.Packages...) - } - } - - // load defaults from somewhere else - dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") - - return path.Join(nixery, dependencies) + Steps []Step + Name string + Data any } diff --git a/spindle/server.go b/spindle/server.go index a04924c..3542e9b 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -20,6 +20,7 @@ import ( "tangled.sh/tangled.sh/core/spindle/config" "tangled.sh/tangled.sh/core/spindle/db" "tangled.sh/tangled.sh/core/spindle/engine" + "tangled.sh/tangled.sh/core/spindle/engines/nixery" "tangled.sh/tangled.sh/core/spindle/models" "tangled.sh/tangled.sh/core/spindle/queue" "tangled.sh/tangled.sh/core/spindle/secrets" @@ -39,7 +40,7 @@ type Spindle struct { e *rbac.Enforcer l *slog.Logger n *notifier.Notifier - eng *engine.Engine + engs map[string]models.Engine jq *queue.Queue cfg *config.Config ks *eventconsumer.Consumer @@ -93,7 +94,7 @@ func Run(ctx context.Context) error { return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) } - eng, err := engine.New(ctx, cfg, d, &n, vault) + nixeryEng, err := nixery.New(ctx, cfg) if err != nil { return err } @@ -119,7 +120,7 @@ func Run(ctx context.Context) error { db: d, l: logger, n: &n, - eng: eng, + engs: map[string]models.Engine{"nixery": nixeryEng}, jq: jq, cfg: cfg, res: resolver, @@ -207,7 +208,7 @@ func (s *Spindle) XrpcRouter() http.Handler { Logger: logger, Db: s.db, Enforcer: s.e, - Engine: s.eng, + Engines: s.engs, Config: s.cfg, Resolver: s.res, Vault: s.vault, @@ -248,9 +249,36 @@ func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, Rkey: msg.Rkey, } + workflows := make(map[models.Engine][]models.Workflow) + for _, w := range tpl.Workflows { if w != nil { - err := s.db.StatusPending(models.WorkflowId{ + if _, ok := s.engs[w.Engine]; !ok { + err = s.db.StatusFailed(models.WorkflowId{ + PipelineId: pipelineId, + Name: w.Name, + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) + if err != nil { + return err + } + + continue + } + + eng := s.engs[w.Engine] + + if _, ok := workflows[eng]; !ok { + workflows[eng] = []models.Workflow{} + } + + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) + if err != nil { + return err + } + + workflows[eng] = append(workflows[eng], *ewf) + + err = s.db.StatusPending(models.WorkflowId{ PipelineId: pipelineId, Name: w.Name, }, s.n) @@ -260,11 +288,13 @@ func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, } } - spl := models.ToPipeline(tpl, *s.cfg) - ok := s.jq.Enqueue(queue.Job{ Run: func() error { - s.eng.StartWorkflows(ctx, spl, pipelineId) + engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ + RepoOwner: tpl.TriggerMetadata.Repo.Did, + RepoName: tpl.TriggerMetadata.Repo.Repo, + Workflows: workflows, + }, pipelineId) return nil }, OnFail: func(jobError error) { diff --git a/spindle/stream.go b/spindle/stream.go index 8eed7a6..6167487 100644 --- a/spindle/stream.go +++ b/spindle/stream.go @@ -9,7 +9,6 @@ import ( "strconv" "time" - "tangled.sh/tangled.sh/core/spindle/engine" "tangled.sh/tangled.sh/core/spindle/models" "github.com/go-chi/chi/v5" @@ -143,7 +142,7 @@ func (s *Spindle) streamLogsFromDisk(ctx context.Context, conn *websocket.Conn, } isFinished := models.StatusKind(status.Status).IsFinish() - filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid) + filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) config := tail.Config{ Follow: !isFinished, diff --git a/spindle/xrpc/xrpc.go b/spindle/xrpc/xrpc.go index f4ab60a..6ad8673 100644 --- a/spindle/xrpc/xrpc.go +++ b/spindle/xrpc/xrpc.go @@ -17,7 +17,7 @@ import ( "tangled.sh/tangled.sh/core/rbac" "tangled.sh/tangled.sh/core/spindle/config" "tangled.sh/tangled.sh/core/spindle/db" - "tangled.sh/tangled.sh/core/spindle/engine" + "tangled.sh/tangled.sh/core/spindle/models" "tangled.sh/tangled.sh/core/spindle/secrets" ) @@ -27,7 +27,7 @@ type Xrpc struct { Logger *slog.Logger Db *db.DB Enforcer *rbac.Enforcer - Engine *engine.Engine + Engines map[string]models.Engine Config *config.Config Resolver *idresolver.Resolver Vault secrets.Manager diff --git a/workflow/compile.go b/workflow/compile.go index 057ee59..f920ec4 100644 --- a/workflow/compile.go +++ b/workflow/compile.go @@ -1,6 +1,7 @@ package workflow import ( + "errors" "fmt" "tangled.sh/tangled.sh/core/api/tangled" @@ -39,6 +40,10 @@ type Warning struct { Reason string } +var ( + MissingEngine error = errors.New("missing engine") +) + type WarningKind string var ( @@ -55,19 +60,18 @@ func (compiler *Compiler) Compile(p Pipeline) tangled.Pipeline { for _, w := range p { cw := compiler.compileWorkflow(w) - // empty workflows are not added to the pipeline - if len(cw.Steps) == 0 { + if cw == nil { continue } - cp.Workflows = append(cp.Workflows, &cw) + cp.Workflows = append(cp.Workflows, cw) } return cp } -func (compiler *Compiler) compileWorkflow(w Workflow) tangled.Pipeline_Workflow { - cw := tangled.Pipeline_Workflow{} +func (compiler *Compiler) compileWorkflow(w Workflow) *tangled.Pipeline_Workflow { + cw := &tangled.Pipeline_Workflow{} if !w.Match(compiler.Trigger) { compiler.Diagnostics.AddWarning( @@ -75,45 +79,22 @@ func (compiler *Compiler) compileWorkflow(w Workflow) tangled.Pipeline_Workflow WorkflowSkipped, fmt.Sprintf("did not match trigger %s", compiler.Trigger.Kind), ) - return cw - } - - if len(w.Steps) == 0 { - compiler.Diagnostics.AddWarning( - w.Name, - WorkflowSkipped, - "empty workflow", - ) - return cw + return nil } // validate clone options compiler.analyzeCloneOptions(w) cw.Name = w.Name - cw.Dependencies = w.Dependencies.AsRecord() - for _, s := range w.Steps { - step := tangled.Pipeline_Step{ - Command: s.Command, - Name: s.Name, - } - for k, v := range s.Environment { - e := &tangled.Pipeline_Pair{ - Key: k, - Value: v, - } - step.Environment = append(step.Environment, e) - } - cw.Steps = append(cw.Steps, &step) - } - for k, v := range w.Environment { - e := &tangled.Pipeline_Pair{ - Key: k, - Value: v, - } - cw.Environment = append(cw.Environment, e) + + if w.Engine == "" { + compiler.Diagnostics.AddError(MissingEngine) + return nil } + cw.Engine = w.Engine + cw.Raw = w.Raw + o := w.CloneOpts.AsRecord() cw.Clone = &o diff --git a/workflow/compile_test.go b/workflow/compile_test.go index 9d005b8..245eba6 100644 --- a/workflow/compile_test.go +++ b/workflow/compile_test.go @@ -26,11 +26,9 @@ var when = []Constraint{ func TestCompileWorkflow_MatchingWorkflowWithSteps(t *testing.T) { wf := Workflow{ - Name: ".tangled/workflows/test.yml", - When: when, - Steps: []Step{ - {Name: "Test", Command: "go test ./..."}, - }, + Name: ".tangled/workflows/test.yml", + Engine: "nixery", + When: when, CloneOpts: CloneOpts{}, // default true } @@ -43,33 +41,16 @@ func TestCompileWorkflow_MatchingWorkflowWithSteps(t *testing.T) { assert.False(t, c.Diagnostics.IsErr()) } -func TestCompileWorkflow_EmptySteps(t *testing.T) { - wf := Workflow{ - Name: ".tangled/workflows/empty.yml", - When: when, - Steps: []Step{}, // no steps - } - - c := Compiler{Trigger: trigger} - cp := c.Compile([]Workflow{wf}) - - assert.Len(t, cp.Workflows, 0) - assert.Len(t, c.Diagnostics.Warnings, 1) - assert.Equal(t, WorkflowSkipped, c.Diagnostics.Warnings[0].Type) -} - func TestCompileWorkflow_TriggerMismatch(t *testing.T) { wf := Workflow{ - Name: ".tangled/workflows/mismatch.yml", + Name: ".tangled/workflows/mismatch.yml", + Engine: "nixery", When: []Constraint{ { Event: []string{"push"}, Branch: []string{"master"}, // different branch }, }, - Steps: []Step{ - {Name: "Lint", Command: "golint ./..."}, - }, } c := Compiler{Trigger: trigger} @@ -82,11 +63,9 @@ func TestCompileWorkflow_TriggerMismatch(t *testing.T) { func TestCompileWorkflow_CloneFalseWithShallowTrue(t *testing.T) { wf := Workflow{ - Name: ".tangled/workflows/clone_skip.yml", - When: when, - Steps: []Step{ - {Name: "Skip", Command: "echo skip"}, - }, + Name: ".tangled/workflows/clone_skip.yml", + Engine: "nixery", + When: when, CloneOpts: CloneOpts{ Skip: true, Depth: 1, @@ -101,3 +80,18 @@ func TestCompileWorkflow_CloneFalseWithShallowTrue(t *testing.T) { assert.Len(t, c.Diagnostics.Warnings, 1) assert.Equal(t, InvalidConfiguration, c.Diagnostics.Warnings[0].Type) } + +func TestCompileWorkflow_MissingEngine(t *testing.T) { + wf := Workflow{ + Name: ".tangled/workflows/missing_engine.yml", + When: when, + Engine: "", + } + + c := Compiler{Trigger: trigger} + cp := c.Compile([]Workflow{wf}) + + assert.Len(t, cp.Workflows, 0) + assert.Len(t, c.Diagnostics.Errors, 1) + assert.Equal(t, MissingEngine, c.Diagnostics.Errors[0]) +} diff --git a/workflow/def.go b/workflow/def.go index 3d0acd3..85838d1 100644 --- a/workflow/def.go +++ b/workflow/def.go @@ -24,12 +24,11 @@ type ( // this is simply a structural representation of the workflow file Workflow struct { - Name string `yaml:"-"` // name of the workflow file - When []Constraint `yaml:"when"` - Dependencies Dependencies `yaml:"dependencies"` - Steps []Step `yaml:"steps"` - Environment map[string]string `yaml:"environment"` - CloneOpts CloneOpts `yaml:"clone"` + Name string `yaml:"-"` // name of the workflow file + Engine string `yaml:"engine"` + When []Constraint `yaml:"when"` + CloneOpts CloneOpts `yaml:"clone"` + Raw string `yaml:"-"` } Constraint struct { @@ -37,20 +36,12 @@ type ( Branch StringList `yaml:"branch"` // this is optional, and only applied on "push" events } - Dependencies map[string][]string - CloneOpts struct { Skip bool `yaml:"skip"` Depth int `yaml:"depth"` IncludeSubmodules bool `yaml:"submodules"` } - Step struct { - Name string `yaml:"name"` - Command string `yaml:"command"` - Environment map[string]string `yaml:"environment"` - } - StringList []string TriggerKind string @@ -77,6 +68,7 @@ func FromFile(name string, contents []byte) (Workflow, error) { } wf.Name = name + wf.Raw = string(contents) return wf, nil } @@ -175,25 +167,6 @@ func (s *StringList) UnmarshalYAML(unmarshal func(any) error) error { return errors.New("failed to unmarshal StringOrSlice") } -// conversion utilities to atproto records -func (d Dependencies) AsRecord() []*tangled.Pipeline_Dependency { - var deps []*tangled.Pipeline_Dependency - for registry, packages := range d { - deps = append(deps, &tangled.Pipeline_Dependency{ - Registry: registry, - Packages: packages, - }) - } - return deps -} - -func (s Step) AsRecord() tangled.Pipeline_Step { - return tangled.Pipeline_Step{ - Command: s.Command, - Name: s.Name, - } -} - func (c CloneOpts) AsRecord() tangled.Pipeline_CloneOpts { return tangled.Pipeline_CloneOpts{ Depth: int64(c.Depth), diff --git a/workflow/def_test.go b/workflow/def_test.go index d29ab67..6d9fe6b 100644 --- a/workflow/def_test.go +++ b/workflow/def_test.go @@ -10,18 +10,7 @@ func TestUnmarshalWorkflow(t *testing.T) { yamlData := ` when: - event: ["push", "pull_request"] - branch: ["main", "develop"] - -dependencies: - nixpkgs: - - go - - git - - curl - -steps: - - name: "Test" - command: | - go test ./...` + branch: ["main", "develop"]` wf, err := FromFile("test.yml", []byte(yamlData)) assert.NoError(t, err, "YAML should unmarshal without error") @@ -30,44 +19,9 @@ steps: assert.ElementsMatch(t, []string{"main", "develop"}, wf.When[0].Branch) assert.ElementsMatch(t, []string{"push", "pull_request"}, wf.When[0].Event) - assert.Len(t, wf.Steps, 1) - assert.Equal(t, "Test", wf.Steps[0].Name) - assert.Equal(t, "go test ./...", wf.Steps[0].Command) - - pkgs, ok := wf.Dependencies["nixpkgs"] - assert.True(t, ok, "`nixpkgs` should be present in dependencies") - assert.ElementsMatch(t, []string{"go", "git", "curl"}, pkgs) - assert.False(t, wf.CloneOpts.Skip, "Skip should default to false") } -func TestUnmarshalCustomRegistry(t *testing.T) { - yamlData := ` -when: - - event: push - branch: main - -dependencies: - git+https://tangled.sh/@oppi.li/tbsp: - - tbsp - git+https://git.peppe.rs/languages/statix: - - statix - -steps: - - name: "Check" - command: | - statix check` - - wf, err := FromFile("test.yml", []byte(yamlData)) - assert.NoError(t, err, "YAML should unmarshal without error") - - assert.ElementsMatch(t, []string{"push"}, wf.When[0].Event) - assert.ElementsMatch(t, []string{"main"}, wf.When[0].Branch) - - assert.ElementsMatch(t, []string{"tbsp"}, wf.Dependencies["git+https://tangled.sh/@oppi.li/tbsp"]) - assert.ElementsMatch(t, []string{"statix"}, wf.Dependencies["git+https://git.peppe.rs/languages/statix"]) -} - func TestUnmarshalCloneFalse(t *testing.T) { yamlData := ` when: @@ -75,15 +29,6 @@ when: clone: skip: true - -dependencies: - nixpkgs: - - python3 - -steps: - - name: Notify - command: | - python3 ./notify.py ` wf, err := FromFile("test.yml", []byte(yamlData)) @@ -93,33 +38,3 @@ steps: assert.True(t, wf.CloneOpts.Skip, "Skip should be false") } - -func TestUnmarshalEnv(t *testing.T) { - yamlData := ` -when: - - event: ["pull_request_close"] - -clone: - skip: false - -environment: - HOME: /home/foo bar/baz - CGO_ENABLED: 1 - -steps: - - name: Something - command: echo "hello" - environment: - FOO: bar - BAZ: qux -` - - wf, err := FromFile("test.yml", []byte(yamlData)) - assert.NoError(t, err) - - assert.Len(t, wf.Environment, 2) - assert.Equal(t, "/home/foo bar/baz", wf.Environment["HOME"]) - assert.Equal(t, "1", wf.Environment["CGO_ENABLED"]) - assert.Equal(t, "bar", wf.Steps[0].Environment["FOO"]) - assert.Equal(t, "qux", wf.Steps[0].Environment["BAZ"]) -} -- 2.43.0