···
···
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
-
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
···
func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
···
-
// SetupWorkflow sets up a new network for the workflow and volumes for
-
// the workspace and Nix store. These are persisted across steps and are
-
// destroyed at the end of the workflow.
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: workspaceVolume(wid),
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
-
_, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
-
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.NetworkRemove(ctx, networkName(wid))
-
addl := wf.Data.(addlFields)
-
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
-
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
-
return fmt.Errorf("pulling image: %w", err)
-
io.Copy(os.Stdout, reader)
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
-
workflowEnvs := ConstructEnvs(w.Data.(addlFields).env)
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
···
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
-
hostConfig := hostConfig(wid)
-
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
-
Image: w.Data.(addlFields).image,
-
Cmd: []string{"bash", "-c", step.command},
-
WorkingDir: workspaceDir,
-
}, hostConfig, nil, nil, "")
-
defer e.DestroyStep(ctx, resp.ID)
-
return fmt.Errorf("creating container: %w", err)
-
err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
-
return fmt.Errorf("connecting network: %w", err)
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
-
e.l.Info("started container", "name", resp.ID, "step", step.Name)
// start tailing logs in background
tailDone := make(chan error, 1)
-
tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step)
-
// wait for container completion or timeout
-
waitDone := make(chan struct{})
-
var state *container.State
-
state, waitErr = e.WaitStep(ctx, resp.ID)
-
// wait for tailing to complete
-
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
-
err = e.DestroyStep(context.Background(), resp.ID)
-
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
-
// wait for both goroutines to finish
return engine.ErrTimedOut
···
-
err = e.DestroyStep(ctx, resp.ID)
-
if state.ExitCode != 0 {
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
-
return engine.ErrWorkflowFailed
-
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
-
wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
-
e.l.Info("waited for container", "name", containerID)
-
info, err := e.docker.ContainerInspect(ctx, containerID)
-
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
-
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
_, err = stdcopy.StdCopy(
wfLogger.DataWriter("stdout"),
wfLogger.DataWriter("stderr"),
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to copy logs: %w", err)
···
-
func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
-
err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
-
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
-
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
-
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
···
e.cleanup[key] = append(e.cleanup[key], fn)
-
func workspaceVolume(wid models.WorkflowId) string {
-
return fmt.Sprintf("workspace-%s", wid)
-
func nixVolume(wid models.WorkflowId) string {
-
return fmt.Sprintf("nix-%s", wid)
func networkName(wid models.WorkflowId) string {
return fmt.Sprintf("workflow-network-%s", wid)
-
func hostConfig(wid models.WorkflowId) *container.HostConfig {
-
hostConfig := &container.HostConfig{
-
Type: mount.TypeVolume,
-
Source: workspaceVolume(wid),
-
Type: mount.TypeVolume,
-
Source: nixVolume(wid),
-
TmpfsOptions: &mount.TmpfsOptions{
-
Mode: 0o1777, // world-writeable sticky bit
-
Type: mount.TypeVolume,
-
Source: "etc-nix-" + wid.String(),
-
CapDrop: []string{"ALL"},
-
CapAdd: []string{"CAP_DAC_OVERRIDE"},
-
SecurityOpt: []string{"no-new-privileges"},
-
ExtraHosts: []string{"host.docker.internal:host-gateway"},
-
func isErrContainerNotFoundOrNotRunning(err error) bool {
-
// Error response from daemon: Cannot kill container: ...: No such container: ...
-
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
-
// Error response from podman daemon: can only kill running containers. ... is in state exited
-
// Error: No such container: ...
-
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
···
···
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
···
func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
···
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
+
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
e.registerCleanup(wid, func(ctx context.Context) error {
+
return e.docker.NetworkRemove(ctx, networkName(wid))
+
addl := wf.Data.(addlFields)
+
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
+
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
+
return fmt.Errorf("pulling image: %w", err)
+
io.Copy(os.Stdout, reader)
+
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
+
OpenStdin: true, // so cat stays alive :3
+
// TODO(winter): investigate whether environment variables passed here
+
// get propagated to ContainerExec processes
+
}, &container.HostConfig{
+
TmpfsOptions: &mount.TmpfsOptions{
+
Mode: 0o1777, // world-writeable sticky bit
+
CapDrop: []string{"ALL"},
+
CapAdd: []string{"CAP_DAC_OVERRIDE"},
+
SecurityOpt: []string{"no-new-privileges"},
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
+
return fmt.Errorf("creating container: %w", err)
e.registerCleanup(wid, func(ctx context.Context) error {
+
err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
+
return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
+
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
+
return fmt.Errorf("starting container: %w", err)
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
+
Cmd: []string{"mkdir", "-p", workspaceDir},
+
AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
+
AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
+
// This actually *starts* the command. Thanks, Docker!
+
execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
+
// This is apparently best way to wait for the command to complete.
+
_, err = io.ReadAll(execResp.Reader)
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
+
if execInspectResp.ExitCode != 0 {
+
return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
+
} else if execInspectResp.Running {
+
return errors.New("mkdir is somehow still running??")
+
addl.container = resp.ID
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
+
addl := w.Data.(addlFields)
+
workflowEnvs := ConstructEnvs(addl.env)
+
// TODO(winter): should SetupWorkflow also have secret access?
+
// IMO yes, but probably worth thinking on.
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
···
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
+
Cmd: []string{"bash", "-c", step.command},
+
return fmt.Errorf("creating exec: %w", err)
// start tailing logs in background
tailDone := make(chan error, 1)
+
tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
+
// cleanup will be handled by DestroyWorkflow, since
+
// Docker doesn't provide an API to kill an exec run
+
// (sure, we could grab the PID and kill it ourselves,
+
// but that's wasted effort)
+
e.l.Warn("step timed out", "step", step.Name)
return engine.ErrTimedOut
···
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
+
if execInspectResp.ExitCode != 0 {
+
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
+
if inspectResp.State.OOMKilled {
+
return engine.ErrWorkflowFailed
+
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
+
// This actually *starts* the command. Thanks, Docker!
+
logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
_, err = stdcopy.StdCopy(
wfLogger.DataWriter("stdout"),
wfLogger.DataWriter("stderr"),
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to copy logs: %w", err)
···
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
···
e.cleanup[key] = append(e.cleanup[key], fn)
func networkName(wid models.WorkflowId) string {
return fmt.Sprintf("workflow-network-%s", wid)