···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
-
// set up logging channels
if _, exists := e.stdoutChans[wid.String()]; !exists {
e.stdoutChans[wid.String()] = make(chan string, 100)
···
return fmt.Errorf("connecting network: %w", err)
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
e.l.Info("started container", "name", resp.ID, "step", step.Name)
-
err := e.TailStep(ctx, resp.ID, wid)
-
e.l.Error("failed to tail container", "container", resp.ID)
-
// wait until all logs are piped
-
state, err := e.WaitStep(ctx, resp.ID)
err = e.DestroyStep(ctx, resp.ID)
···
-
return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
···
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
-
if err != nil && err != io.EOF {
e.l.Error("failed to copy logs", "error", err)
···
if err := fn(ctx); err != nil {
-
e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
+
// Fixed version of the step execution logic
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
+
stepTimeoutStr := e.cfg.Pipelines.StepTimeout
+
stepTimeout, err := time.ParseDuration(stepTimeoutStr)
+
e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
+
stepTimeout = 5 * time.Minute
+
e.l.Info("using step timeout", "timeout", stepTimeout)
if _, exists := e.stdoutChans[wid.String()]; !exists {
e.stdoutChans[wid.String()] = make(chan string, 100)
···
return fmt.Errorf("connecting network: %w", err)
+
stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
+
err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
e.l.Info("started container", "name", resp.ID, "step", step.Name)
+
// start tailing logs in background
+
tailDone := make(chan error, 1)
+
tailDone <- e.TailStep(stepCtx, resp.ID, wid)
+
// wait for container completion or timeout
+
waitDone := make(chan struct{})
+
var state *container.State
+
state, waitErr = e.WaitStep(stepCtx, resp.ID)
+
// container finished normally
+
// wait for tailing to complete
+
e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
+
_ = e.DestroyStep(ctx, resp.ID)
+
// wait for both goroutines to finish
+
return fmt.Errorf("step timed out after %v", stepTimeout)
err = e.DestroyStep(ctx, resp.ID)
···
+
return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
···
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
+
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
e.l.Error("failed to copy logs", "error", err)
···
if err := fn(ctx); err != nil {
+
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)