From 93cfd4e431f5d01176760db1f1baa4cb61e32fe4 Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Thu, 19 Jun 2025 18:11:32 +0300 Subject: [PATCH] spindle/{config,engine}: configure a timeout for steps Change-Id: ktmuymknvpwmztkwooqpxynuxomypwoo WaitStep and TailStep now run in goroutines, and are tracked with a stepCtx which has a timeout attached. Once stepCtx expires, the step is killed with DestroyStep. The default timeout is set to 5m, and is configureable using SPINDLE_PIPELINES_STEP_TIMEOUT. Signed-off-by: Anirudh Oppiliappan --- spindle/config/config.go | 3 +- spindle/engine/ansi_stripper.go | 4 +- spindle/engine/engine.go | 71 ++++++++++++++++++++++++--------- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/spindle/config/config.go b/spindle/config/config.go index 502cea9..1750103 100644 --- a/spindle/config/config.go +++ b/spindle/config/config.go @@ -17,7 +17,8 @@ type Server struct { type Pipelines struct { // TODO: change default to nixery.tangled.sh - Nixery string `env:"NIXERY, default=nixery.dev"` + Nixery string `env:"NIXERY, default=nixery.dev"` + StepTimeout string `env:"STEP_TIMEOUT, default=5m"` } type Config struct { diff --git a/spindle/engine/ansi_stripper.go b/spindle/engine/ansi_stripper.go index 520ea8a..f4ef864 100644 --- a/spindle/engine/ansi_stripper.go +++ b/spindle/engine/ansi_stripper.go @@ -3,13 +3,13 @@ package engine import ( "io" - "github.com/go-enry/go-enry/v2/regex" + "regexp" ) // regex to match ANSI escape codes (e.g., color codes, cursor moves) const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" -var re = regex.MustCompile(ansi) +var re = regexp.MustCompile(ansi) type ansiStrippingWriter struct { underlying io.Writer diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index 69b7008..6a7175c 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -3,12 +3,14 @@ package engine import ( "bufio" "context" + "errors" "fmt" "io" "log/slog" "os" "strings" "sync" + "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" @@ -176,8 +178,16 @@ func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error // StartSteps starts all steps sequentially with the same base image. // ONLY marks pipeline as failed if container's exit code is non-zero. // All other errors are bubbled up. +// Fixed version of the step execution logic func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error { - // set up logging channels + stepTimeoutStr := e.cfg.Pipelines.StepTimeout + stepTimeout, err := time.ParseDuration(stepTimeoutStr) + if err != nil { + e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr) + stepTimeout = 5 * time.Minute + } + e.l.Info("using step timeout", "timeout", stepTimeout) + e.chanMu.Lock() if _, exists := e.stdoutChans[wid.String()]; !exists { e.stdoutChans[wid.String()] = make(chan string, 100) @@ -216,30 +226,54 @@ func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models return fmt.Errorf("connecting network: %w", err) } - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) + stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout) + + err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{}) if err != nil { + stepCancel() return err } e.l.Info("started container", "name", resp.ID, "step", step.Name) - wg := sync.WaitGroup{} + // start tailing logs in background + tailDone := make(chan error, 1) + go func() { + tailDone <- e.TailStep(stepCtx, resp.ID, wid) + }() + + // wait for container completion or timeout + waitDone := make(chan struct{}) + var state *container.State + var waitErr error - wg.Add(1) go func() { - defer wg.Done() - err := e.TailStep(ctx, resp.ID, wid) - if err != nil { - e.l.Error("failed to tail container", "container", resp.ID) - return - } + defer close(waitDone) + state, waitErr = e.WaitStep(stepCtx, resp.ID) }() - // wait until all logs are piped - wg.Wait() + select { + case <-waitDone: + // container finished normally + stepCancel() - state, err := e.WaitStep(ctx, resp.ID) - if err != nil { - return err + // wait for tailing to complete + <-tailDone + + case <-stepCtx.Done(): + e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout) + + _ = e.DestroyStep(ctx, resp.ID) + + // wait for both goroutines to finish + <-waitDone + <-tailDone + + stepCancel() + return fmt.Errorf("step timed out after %v", stepTimeout) + } + + if waitErr != nil { + return waitErr } err = e.DestroyStep(ctx, resp.ID) @@ -253,12 +287,11 @@ func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models if err != nil { return err } - return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled) + return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled) } } return nil - } func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { @@ -318,7 +351,7 @@ func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.Wo defer wpipeOut.Close() defer wpipeErr.Close() _, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee) - if err != nil && err != io.EOF { + if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) { e.l.Error("failed to copy logs", "error", err) } }() @@ -393,7 +426,7 @@ func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) err for _, fn := range fns { if err := fn(ctx); err != nil { - e.l.Error("failed to cleanup workflow resource", "workflowId", wid) + e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) } } return nil -- 2.43.0