···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
181
+
// Fixed version of the step execution logic
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
180
-
// set up logging channels
183
+
stepTimeoutStr := e.cfg.Pipelines.StepTimeout
184
+
stepTimeout, err := time.ParseDuration(stepTimeoutStr)
186
+
e.l.Error("failed to parse step timeout", "error", err, "timeout", stepTimeoutStr)
187
+
stepTimeout = 5 * time.Minute
189
+
e.l.Info("using step timeout", "timeout", stepTimeout)
if _, exists := e.stdoutChans[wid.String()]; !exists {
e.stdoutChans[wid.String()] = make(chan string, 100)
···
return fmt.Errorf("connecting network: %w", err)
219
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
229
+
stepCtx, stepCancel := context.WithTimeout(ctx, stepTimeout)
231
+
err = e.docker.ContainerStart(stepCtx, resp.ID, container.StartOptions{})
e.l.Info("started container", "name", resp.ID, "step", step.Name)
225
-
wg := sync.WaitGroup{}
238
+
// start tailing logs in background
239
+
tailDone := make(chan error, 1)
241
+
tailDone <- e.TailStep(stepCtx, resp.ID, wid)
244
+
// wait for container completion or timeout
245
+
waitDone := make(chan struct{})
246
+
var state *container.State
230
-
err := e.TailStep(ctx, resp.ID, wid)
232
-
e.l.Error("failed to tail container", "container", resp.ID)
250
+
defer close(waitDone)
251
+
state, waitErr = e.WaitStep(stepCtx, resp.ID)
237
-
// wait until all logs are piped
256
+
// container finished normally
240
-
state, err := e.WaitStep(ctx, resp.ID)
259
+
// wait for tailing to complete
262
+
case <-stepCtx.Done():
263
+
e.l.Warn("step timed out; killing container", "container", resp.ID, "timeout", stepTimeout)
265
+
_ = e.DestroyStep(ctx, resp.ID)
267
+
// wait for both goroutines to finish
272
+
return fmt.Errorf("step timed out after %v", stepTimeout)
275
+
if waitErr != nil {
err = e.DestroyStep(ctx, resp.ID)
···
256
-
return fmt.Errorf("error: %s, exit code: %d, oom: %s", state.Error, state.ExitCode, state.OOMKilled)
290
+
return fmt.Errorf("error: %s, exit code: %d, oom: %t", state.Error, state.ExitCode, state.OOMKilled)
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
···
_, err := stdcopy.StdCopy(wpipeOut, wpipeErr, tee)
321
-
if err != nil && err != io.EOF {
354
+
if err != nil && err != io.EOF && !errors.Is(context.DeadlineExceeded, err) {
e.l.Error("failed to copy logs", "error", err)
···
if err := fn(ctx); err != nil {
396
-
e.l.Error("failed to cleanup workflow resource", "workflowId", wid)
429
+
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)