From b1136410cf949c18708687b8b6f7dc306ef78f3a Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Mon, 16 Jun 2025 23:16:18 +0300 Subject: [PATCH] spindle/engine: clean up envs, stdout on dev and use new models Change-Id: ulypoypunyzkvvwonroqkwtqkllrmxwt Signed-off-by: Anirudh Oppiliappan --- spindle/config/config.go | 8 +++- spindle/engine/ansi_stripper.go | 21 ++++++++++ spindle/engine/engine.go | 70 ++++++++++++++++++++------------- spindle/engine/envs.go | 12 ++---- spindle/engine/envs_test.go | 23 ++--------- spindle/models/pipeline.go | 12 +++--- spindle/server.go | 22 ++++++----- 7 files changed, 96 insertions(+), 72 deletions(-) create mode 100644 spindle/engine/ansi_stripper.go diff --git a/spindle/config/config.go b/spindle/config/config.go index 72a5036..502cea9 100644 --- a/spindle/config/config.go +++ b/spindle/config/config.go @@ -15,8 +15,14 @@ type Server struct { Owner string `env:"OWNER, required"` } +type Pipelines struct { + // TODO: change default to nixery.tangled.sh + Nixery string `env:"NIXERY, default=nixery.dev"` +} + type Config struct { - Server Server `env:",prefix=SPINDLE_SERVER_"` + Server Server `env:",prefix=SPINDLE_SERVER_"` + Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"` } func Load(ctx context.Context) (*Config, error) { diff --git a/spindle/engine/ansi_stripper.go b/spindle/engine/ansi_stripper.go new file mode 100644 index 0000000..520ea8a --- /dev/null +++ b/spindle/engine/ansi_stripper.go @@ -0,0 +1,21 @@ +package engine + +import ( + "io" + + "github.com/go-enry/go-enry/v2/regex" +) + +// regex to match ANSI escape codes (e.g., color codes, cursor moves) +const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" + +var re = regex.MustCompile(ansi) + +type ansiStrippingWriter struct { + underlying io.Writer +} + +func (w *ansiStrippingWriter) Write(p []byte) (int, error) { + clean := re.ReplaceAll(p, []byte{}) + return w.underlying.Write(clean) +} diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index dd2311e..69b7008 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -7,7 +7,6 @@ import ( "io" "log/slog" "os" - "path" "strings" "sync" @@ -18,9 +17,9 @@ import ( "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" - "tangled.sh/tangled.sh/core/api/tangled" "tangled.sh/tangled.sh/core/log" "tangled.sh/tangled.sh/core/notifier" + "tangled.sh/tangled.sh/core/spindle/config" "tangled.sh/tangled.sh/core/spindle/db" "tangled.sh/tangled.sh/core/spindle/models" ) @@ -36,6 +35,7 @@ type Engine struct { l *slog.Logger db *db.DB n *notifier.Notifier + cfg *config.Config chanMu sync.RWMutex stdoutChans map[string]chan string @@ -45,7 +45,7 @@ type Engine struct { cleanup map[string][]cleanupFunc } -func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { +func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) { dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return nil, err @@ -58,6 +58,7 @@ func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) l: l, db: db, n: n, + cfg: cfg, } e.stdoutChans = make(map[string]chan string, 100) @@ -68,7 +69,7 @@ func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) return e, nil } -func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, pipelineId models.PipelineId) { +func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) wg := sync.WaitGroup{} @@ -93,19 +94,7 @@ func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, } defer e.DestroyWorkflow(ctx, wid) - // TODO: actual checks for image/registry etc. - var deps string - for _, d := range w.Dependencies { - if d.Registry == "nixpkgs" { - deps = path.Join(d.Packages...) - } - } - - // load defaults from somewhere else - deps = path.Join(deps, "bash", "git", "coreutils", "nix") - - cimg := path.Join("nixery.dev", deps) - reader, err := e.docker.ImagePull(ctx, cimg, image.PullOptions{}) + reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) if err != nil { e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error()) @@ -119,13 +108,13 @@ func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, defer reader.Close() io.Copy(os.Stdout, reader) - err = e.StartSteps(ctx, w.Steps, wid, cimg) + err = e.StartSteps(ctx, w.Steps, wid, w.Image) if err != nil { e.l.Error("workflow failed!", "wid", wid.String(), "error", err.Error()) - err := e.db.StatusFailed(wid, err.Error(), -1, e.n) - if err != nil { - return err + dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) + if dbErr != nil { + return dbErr } return fmt.Errorf("starting steps image: %w", err) @@ -187,7 +176,7 @@ func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error // StartSteps starts all steps sequentially with the same base image. // ONLY marks pipeline as failed if container's exit code is non-zero. // All other errors are bubbled up. -func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, wid models.WorkflowId, image string) error { +func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { // set up logging channels e.chanMu.Lock() if _, exists := e.stdoutChans[wid.String()]; !exists { @@ -259,8 +248,12 @@ func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, } if state.ExitCode != 0 { - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode) - return fmt.Errorf("%s", state.Error) + e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) + err := e.db.StatusFailed(wid, state.Error, int64(state.ExitCode), e.n) + if err != nil { + return err + } + return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled) } } @@ -293,13 +286,20 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo Follow: true, ShowStdout: true, ShowStderr: true, - Details: false, + Details: true, Timestamps: false, }) if err != nil { return err } + var devOutput io.Writer = io.Discard + if e.cfg.Server.Dev { + devOutput = &ansiStrippingWriter{underlying: os.Stdout} + } + + tee := io.TeeReader(logs, devOutput) + // using StdCopy we demux logs and stream stdout and stderr to different // channels. // @@ -310,10 +310,14 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo rpipeOut, wpipeOut := io.Pipe() rpipeErr, wpipeErr := io.Pipe() + wg := sync.WaitGroup{} + + wg.Add(1) go func() { + defer wg.Done() defer wpipeOut.Close() defer wpipeErr.Close() - _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, logs) + _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) if err != nil && err != io.EOF { e.l.Error("failed to copy logs", "error", err) } @@ -322,7 +326,9 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo // read from stdout and send to stdout pipe // NOTE: the stdoutCh channnel is closed further up in StartSteps // once all steps are done. + wg.Add(1) go func() { + defer wg.Done() e.chanMu.RLock() stdoutCh := e.stdoutChans[wid.String()] e.chanMu.RUnlock() @@ -339,7 +345,9 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo // read from stderr and send to stderr pipe // NOTE: the stderrCh channnel is closed further up in StartSteps // once all steps are done. + wg.Add(1) go func() { + defer wg.Done() e.chanMu.RLock() stderrCh := e.stderrChans[wid.String()] e.chanMu.RUnlock() @@ -353,6 +361,8 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo } }() + wg.Wait() + return nil } @@ -435,10 +445,14 @@ func hostConfig(wid models.WorkflowId) *container.HostConfig { Source: nixVolume(wid), Target: "/nix", }, + { + Type: mount.TypeTmpfs, + Target: "/tmp", + }, }, - ReadonlyRootfs: true, + ReadonlyRootfs: false, CapDrop: []string{"ALL"}, - SecurityOpt: []string{"no-new-privileges"}, + SecurityOpt: []string{"seccomp=unconfined"}, } return hostConfig diff --git a/spindle/engine/envs.go b/spindle/engine/envs.go index bfe6886..440a9a6 100644 --- a/spindle/engine/envs.go +++ b/spindle/engine/envs.go @@ -2,21 +2,17 @@ package engine import ( "fmt" - - "tangled.sh/tangled.sh/core/api/tangled" ) type EnvVars []string // ConstructEnvs converts a tangled.Pipeline_Step_Environment_Elem.{Key,Value} // representation into a docker-friendly []string{"KEY=value", ...} slice. -func ConstructEnvs(envs []*tangled.Pipeline_Step_Environment_Elem) EnvVars { +func ConstructEnvs(envs map[string]string) EnvVars { var dockerEnvs EnvVars - for _, env := range envs { - if env != nil { - ev := fmt.Sprintf("%s=%s", env.Key, env.Value) - dockerEnvs = append(dockerEnvs, ev) - } + for k, v := range envs { + ev := fmt.Sprintf("%s=%s", k, v) + dockerEnvs = append(dockerEnvs, ev) } return dockerEnvs } diff --git a/spindle/engine/envs_test.go b/spindle/engine/envs_test.go index 8554a1e..912a4e5 100644 --- a/spindle/engine/envs_test.go +++ b/spindle/engine/envs_test.go @@ -3,44 +3,29 @@ package engine import ( "reflect" "testing" - - "tangled.sh/tangled.sh/core/api/tangled" ) func TestConstructEnvs(t *testing.T) { tests := []struct { name string - in []*tangled.Pipeline_Step_Environment_Elem + in map[string]string want EnvVars }{ { name: "empty input", - in: []*tangled.Pipeline_Step_Environment_Elem{}, + in: make(map[string]string), want: EnvVars{}, }, { name: "single env var", - in: []*tangled.Pipeline_Step_Environment_Elem{ - {Key: "FOO", Value: "bar"}, - }, + in: map[string]string{"FOO": "bar"}, want: EnvVars{"FOO=bar"}, }, { name: "multiple env vars", - in: []*tangled.Pipeline_Step_Environment_Elem{ - {Key: "FOO", Value: "bar"}, - {Key: "BAZ", Value: "qux"}, - }, + in: map[string]string{"FOO": "bar", "BAZ": "qux"}, want: EnvVars{"FOO=bar", "BAZ=qux"}, }, - { - name: "nil entries are skipped", - in: []*tangled.Pipeline_Step_Environment_Elem{ - nil, - {Key: "FOO", Value: "bar"}, - }, - want: EnvVars{"FOO=bar"}, - }, } for _, tt := range tests { diff --git a/spindle/models/pipeline.go b/spindle/models/pipeline.go index c7050c6..5fbfea0 100644 --- a/spindle/models/pipeline.go +++ b/spindle/models/pipeline.go @@ -4,6 +4,7 @@ import ( "path" "tangled.sh/tangled.sh/core/api/tangled" + "tangled.sh/tangled.sh/core/spindle/config" ) type Pipeline struct { @@ -35,7 +36,7 @@ func (ss *setupSteps) addStep(step Step) { // In the process, dependencies are resolved: nixpkgs deps // are constructed atop nixery and set as the Workflow.Image, // and ones from custom registries -func ToPipeline(pl tangled.Pipeline, dev bool) *Pipeline { +func ToPipeline(pl tangled.Pipeline, cfg config.Config) *Pipeline { workflows := []Workflow{} for _, twf := range pl.Workflows { @@ -49,12 +50,12 @@ func ToPipeline(pl tangled.Pipeline, dev bool) *Pipeline { } swf.Name = twf.Name swf.Environment = workflowEnvToMap(twf.Environment) - swf.Image = workflowImage(twf.Dependencies) + swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) swf.addNixProfileToPath() setup := &setupSteps{} - setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, dev)) + setup.addStep(cloneStep(*twf, *pl.TriggerMetadata.Repo, cfg.Server.Dev)) setup.addStep(checkoutStep(*twf, *pl.TriggerMetadata)) setup.addStep(dependencyStep(*twf)) @@ -82,7 +83,7 @@ func stepEnvToMap(envs []*tangled.Pipeline_Step_Environment_Elem) map[string]str return envMap } -func workflowImage(deps []tangled.Pipeline_Dependencies_Elem) string { +func workflowImage(deps []tangled.Pipeline_Dependencies_Elem, nixery string) string { var dependencies string for _, d := range deps { if d.Registry == "nixpkgs" { @@ -93,8 +94,7 @@ func workflowImage(deps []tangled.Pipeline_Dependencies_Elem) string { // load defaults from somewhere else dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") - // TODO: this should use nixery from the config - return path.Join("nixery.dev", dependencies) + return path.Join(nixery, dependencies) } func (wf *Workflow) addNixProfileToPath() { diff --git a/spindle/server.go b/spindle/server.go index 3203d65..87d7f2c 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -59,7 +59,7 @@ func Run(ctx context.Context) error { n := notifier.New() - eng, err := engine.New(ctx, d, &n) + eng, err := engine.New(ctx, cfg, d, &n) if err != nil { return err } @@ -153,26 +153,26 @@ func (s *Spindle) Router() http.Handler { func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { if msg.Nsid == tangled.PipelineNSID { - pipeline := tangled.Pipeline{} - err := json.Unmarshal(msg.EventJson, &pipeline) + tpl := tangled.Pipeline{} + err := json.Unmarshal(msg.EventJson, &tpl) if err != nil { fmt.Println("error unmarshalling", err) return err } - if pipeline.TriggerMetadata == nil { + if tpl.TriggerMetadata == nil { return fmt.Errorf("no trigger metadata found") } - if pipeline.TriggerMetadata.Repo == nil { + if tpl.TriggerMetadata.Repo == nil { return fmt.Errorf("no repo data found") } // filter by repos _, err = s.db.GetRepo( - pipeline.TriggerMetadata.Repo.Knot, - pipeline.TriggerMetadata.Repo.Did, - pipeline.TriggerMetadata.Repo.Repo, + tpl.TriggerMetadata.Repo.Knot, + tpl.TriggerMetadata.Repo.Did, + tpl.TriggerMetadata.Repo.Repo, ) if err != nil { return err @@ -183,7 +183,7 @@ func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, Rkey: msg.Rkey, } - for _, w := range pipeline.Workflows { + for _, w := range tpl.Workflows { if w != nil { err := s.db.StatusPending(models.WorkflowId{ PipelineId: pipelineId, @@ -195,9 +195,11 @@ func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, } } + spl := models.ToPipeline(tpl, *s.cfg) + ok := s.jq.Enqueue(queue.Job{ Run: func() error { - s.eng.StartWorkflows(ctx, &pipeline, pipelineId) + s.eng.StartWorkflows(ctx, spl, pipelineId) return nil }, OnFail: func(jobError error) { -- 2.43.0