spindle/{config,engine}: configure a timeout for steps #259

merged
opened by anirudh.fi targeting master from push-momltwttmuyq

WaitStep and TailStep now run in goroutines, and are tracked with a stepCtx which has a timeout attached. Once stepCtx expires, the step is killed with DestroyStep.

The default timeout is set to 5m, and is configureable using SPINDLE_PIPELINES_STEP_TIMEOUT.

Signed-off-by: Anirudh Oppiliappan anirudh@tangled.sh

Changed files
+56 -22
spindle
+2 -1
spindle/config/config.go
···
type Pipelines struct {
// TODO: change default to nixery.tangled.sh
-
Nixery string `env:"NIXERY, default=nixery.dev"`
+
Nixery string `env:"NIXERY, default=nixery.dev"`
+
StepTimeout string `env:"STEP_TIMEOUT, default=5m"`
}
type Config struct {
+2 -2
spindle/engine/ansi_stripper.go
···
import (
"io"
-
"github.com/go-enry/go-enry/v2/regex"
+
"regexp"
)
// regex to match ANSI escape codes (e.g., color codes, cursor moves)
const ansi = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"
-
var re = regex.MustCompile(ansi)
+
var re = regexp.MustCompile(ansi)
type ansiStrippingWriter struct {
underlying io.Writer
+52 -19
spindle/engine/engine.go
···
import (
"bufio"
"context"
+
"errors"
"fmt"
"io"
"log/slog"
"os"
"strings"
"sync"
+
"time"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
+
// Fixed version of the step execution logic
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
-
// set up logging channels
+
stepTimeoutStr := e.cfg.Pipelines.StepTimeout
+
stepTimeout, err := time.ParseDuration(stepTimeoutStr)
+
if err != nil {
+
e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
+
stepTimeout = 5 * time.Minute
+
}
+
e.l.Info("using step timeout", "timeout", stepTimeout)
+
e.chanMu.Lock()
if _, exists := e.stdoutChans[wid.String()]; !exists {
e.stdoutChans[wid.String()] = make(chan string, 100)
···
return fmt.Errorf("connecting network: %w", err)
}
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
+
stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
+
+
err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
if err != nil {
+
stepCancel()
return err
}
e.l.Info("started container", "name", resp.ID, "step", step.Name)
-
wg := sync.WaitGroup{}
+
// start tailing logs in background
+
tailDone := make(chan error, 1)
+
go func() {
+
tailDone <- e.TailStep(stepCtx, resp.ID, wid)
+
}()
+
+
// wait for container completion or timeout
+
waitDone := make(chan struct{})
+
var state *container.State
+
var waitErr error
-
wg.Add(1)
go func() {
-
defer wg.Done()
-
err := e.TailStep(ctx, resp.ID, wid)
-
if err != nil {
-
e.l.Error("failed to tail container", "container", resp.ID)
-
return
-
}
+
defer close(waitDone)
+
state, waitErr = e.WaitStep(stepCtx, resp.ID)
}()
-
// wait until all logs are piped
-
wg.Wait()
+
select {
+
case <-waitDone:
+
// container finished normally
+
stepCancel()
-
state, err := e.WaitStep(ctx, resp.ID)
-
if err != nil {
-
return err
+
// wait for tailing to complete
+
<-tailDone
+
+
case <-stepCtx.Done():
+
e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
+
+
_ = e.DestroyStep(ctx, resp.ID)
+
+
// wait for both goroutines to finish
+
<-waitDone
+
<-tailDone
+
+
stepCancel()
+
return fmt.Errorf("step timed out after %v", stepTimeout)
+
}
+
+
if waitErr != nil {
+
return waitErr
}
err = e.DestroyStep(ctx, resp.ID)
···
if err != nil {
return err
}
-
return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
+
return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
}
}
return nil
-
}
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
···
defer wpipeOut.Close()
defer wpipeErr.Close()
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
-
if err != nil && err != io.EOF {
+
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
e.l.Error("failed to copy logs", "error", err)
}
}()
···
for _, fn := range fns {
if err := fn(ctx); err != nil {
-
e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
+
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
}
}
return nil