···
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
-
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
-
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.NetworkRemove(ctx, networkName(wid))
addl := wf.Data.(addlFields)
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
···
io.Copy(os.Stdout, reader)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
···
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
step := w.Steps[idx].(Step)
envs := append(EnvVars(nil), workflowEnvs...)
for k, v := range step.environment {
envs.AddEnv("HOME", homeDir)
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
Cmd: []string{"bash", "-c", step.command},
···
// cleanup will be handled by DestroyWorkflow, since
// Docker doesn't provide an API to kill an exec run
// (sure, we could grab the PID and kill it ourselves,
// but that's wasted effort)
e.l.Warn("step timed out", "step", step.Name)
return engine.ErrTimedOut
···
-
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
-
if execInspectResp.ExitCode != 0 {
-
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
-
if inspectResp.State.OOMKilled {
-
return engine.ErrWorkflowFailed
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
···
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
addl := wf.Data.(addlFields)
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
···
io.Copy(os.Stdout, reader)
+
_, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
+
e.registerCleanup(wid, func(ctx context.Context) error {
+
return e.docker.NetworkRemove(ctx, networkName(wid))
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
···
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
step := w.Steps[idx].(Step)
envs := append(EnvVars(nil), workflowEnvs...)
for k, v := range step.environment {
envs.AddEnv("HOME", homeDir)
+
e.l.Info("executing step",
+
"workflow_id", wid.String(),
+
"step_name", step.Name,
+
"command", step.command,
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
Cmd: []string{"bash", "-c", step.command},
···
// cleanup will be handled by DestroyWorkflow, since
// Docker doesn't provide an API to kill an exec run
// (sure, we could grab the PID and kill it ourselves,
// but that's wasted effort)
e.l.Warn("step timed out", "step", step.Name)
return engine.ErrTimedOut
···
+
if err = e.handleStepFailure(ctx, wid, w, idx, mkExecResp.ID); err != nil {
+
e.l.Info("step completed successfully",
+
"workflow_id", wid.String(),
+
"step_name", step.Name,
+
// logStepFailure logs detailed information about a failed workflow step
+
func (e *Engine) handleStepFailure(
+
addl := w.Data.(addlFields)
+
step := w.Steps[idx].(Step)
+
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, execID)
+
if execInspectResp.ExitCode == 0 {
+
"workflow_id", wid.String(),
+
"step_name", step.Name,
+
"command", step.command,
+
"container_exit_code", inspectResp.State.ExitCode,
+
"container_oom_killed", inspectResp.State.OOMKilled,
+
"exec_exit_code", execInspectResp.ExitCode,
+
// Add container state information
+
if inspectResp.State != nil {
+
logFields = append(logFields,
+
"container_status", inspectResp.State.Status,
+
"container_running", inspectResp.State.Running,
+
"container_paused", inspectResp.State.Paused,
+
"container_restarting", inspectResp.State.Restarting,
+
"container_dead", inspectResp.State.Dead,
+
if inspectResp.State.Error != "" {
+
logFields = append(logFields, "container_error", inspectResp.State.Error)
+
if inspectResp.State.StartedAt != "" {
+
logFields = append(logFields, "container_started_at", inspectResp.State.StartedAt)
+
if inspectResp.State.FinishedAt != "" {
+
logFields = append(logFields, "container_finished_at", inspectResp.State.FinishedAt)
+
// Add resource usage if available
+
if inspectResp.HostConfig != nil && inspectResp.HostConfig.Memory > 0 {
+
logFields = append(logFields, "memory_limit", inspectResp.HostConfig.Memory)
+
e.l.Error("workflow step failed!", logFields...)
+
if inspectResp.State.OOMKilled {
+
return engine.ErrWorkflowFailed
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {