spindle/engines/nixery: provision one container per workflow #427

merged
opened by winter.bsky.social targeting master from winter.bsky.social/core: push-luoyqwkpromz

This moves away from the old method of creating a container with some shared volumes to one that most users would expect: any changes made in one step will be accessible by the following steps, and not only if they're in the workspace or /etc/nix. This also paves the way for a more generic Docker image engine, as users can do things like apt install without the results being blown away across steps.

Signed-off-by: Winter winter@winter.cafe

Changed files
+121 -185
spindle
engines
+119 -184
spindle/engines/nixery/engine.go
···
"os"
"path"
"runtime"
-
"strings"
"sync"
"time"
···
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
-
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"gopkg.in/yaml.v3"
···
}
type addlFields struct {
-
image string
-
env map[string]string
+
image string
+
container string
+
env map[string]string
}
func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
···
return e, nil
}
-
// SetupWorkflow sets up a new network for the workflow and volumes for
-
// the workspace and Nix store. These are persisted across steps and are
-
// destroyed at the end of the workflow.
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: workspaceVolume(wid),
-
Driver: "local",
+
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
+
Driver: "bridge",
})
if err != nil {
return err
}
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
+
return e.docker.NetworkRemove(ctx, networkName(wid))
})
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: nixVolume(wid),
-
Driver: "local",
-
})
+
addl := wf.Data.(addlFields)
+
+
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
if err != nil {
-
return err
+
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
+
+
return fmt.Errorf("pulling image: %w", err)
+
}
+
defer reader.Close()
+
io.Copy(os.Stdout, reader)
+
+
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
+
Image: addl.image,
+
Cmd: []string{"cat"},
+
OpenStdin: true, // so cat stays alive :3
+
Tty: false,
+
Hostname: "spindle",
+
// TODO(winter): investigate whether environment variables passed here
+
// get propagated to ContainerExec processes
+
}, &container.HostConfig{
+
Mounts: []mount.Mount{
+
{
+
Type: mount.TypeTmpfs,
+
Target: "/tmp",
+
ReadOnly: false,
+
TmpfsOptions: &mount.TmpfsOptions{
+
Mode: 0o1777, // world-writeable sticky bit
+
Options: [][]string{
+
{"exec"},
+
},
+
},
+
},
+
},
+
ReadonlyRootfs: false,
+
CapDrop: []string{"ALL"},
+
CapAdd: []string{"CAP_DAC_OVERRIDE"},
+
SecurityOpt: []string{"no-new-privileges"},
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
+
}, nil, nil, "")
+
if err != nil {
+
return fmt.Errorf("creating container: %w", err)
}
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
+
err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
+
if err != nil {
+
return err
+
}
+
+
return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
+
RemoveVolumes: true,
+
RemoveLinks: false,
+
Force: false,
+
})
})
-
_, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
-
Driver: "bridge",
+
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
+
if err != nil {
+
return fmt.Errorf("starting container: %w", err)
+
}
+
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
+
Cmd: []string{"mkdir", "-p", workspaceDir},
+
AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
+
AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
})
if err != nil {
return err
}
-
e.registerCleanup(wid, func(ctx context.Context) error {
-
return e.docker.NetworkRemove(ctx, networkName(wid))
-
})
-
addl := wf.Data.(addlFields)
+
// This actually *starts* the command. Thanks, Docker!
+
execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
+
if err != nil {
+
return err
+
}
+
defer execResp.Close()
-
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
+
// This is apparently best way to wait for the command to complete.
+
_, err = io.ReadAll(execResp.Reader)
if err != nil {
-
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
+
return err
+
}
-
return fmt.Errorf("pulling image: %w", err)
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
+
if err != nil {
+
return err
}
-
defer reader.Close()
-
io.Copy(os.Stdout, reader)
+
+
if execInspectResp.ExitCode != 0 {
+
return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
+
} else if execInspectResp.Running {
+
return errors.New("mkdir is somehow still running??")
+
}
+
+
addl.container = resp.ID
+
wf.Data = addl
return nil
}
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
-
workflowEnvs := ConstructEnvs(w.Data.(addlFields).env)
+
addl := w.Data.(addlFields)
+
workflowEnvs := ConstructEnvs(addl.env)
+
// TODO(winter): should SetupWorkflow also have secret access?
+
// IMO yes, but probably worth thinking on.
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
}
···
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
-
hostConfig := hostConfig(wid)
-
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
-
Image: w.Data.(addlFields).image,
-
Cmd: []string{"bash", "-c", step.command},
-
WorkingDir: workspaceDir,
-
Tty: false,
-
Hostname: "spindle",
-
Env: envs.Slice(),
-
}, hostConfig, nil, nil, "")
-
defer e.DestroyStep(ctx, resp.ID)
-
if err != nil {
-
return fmt.Errorf("creating container: %w", err)
-
}
-
-
err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
+
Cmd: []string{"bash", "-c", step.command},
+
AttachStdout: true,
+
AttachStderr: true,
+
})
if err != nil {
-
return fmt.Errorf("connecting network: %w", err)
+
return fmt.Errorf("creating exec: %w", err)
}
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
-
if err != nil {
-
return err
-
}
-
e.l.Info("started container", "name", resp.ID, "step", step.Name)
-
// start tailing logs in background
tailDone := make(chan error, 1)
go func() {
-
tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step)
-
}()
-
-
// wait for container completion or timeout
-
waitDone := make(chan struct{})
-
var state *container.State
-
var waitErr error
-
-
go func() {
-
defer close(waitDone)
-
state, waitErr = e.WaitStep(ctx, resp.ID)
+
tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
}()
select {
-
case <-waitDone:
-
-
// wait for tailing to complete
-
<-tailDone
+
case <-tailDone:
case <-ctx.Done():
-
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
-
err = e.DestroyStep(context.Background(), resp.ID)
-
if err != nil {
-
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
-
}
+
// cleanup will be handled by DestroyWorkflow, since
+
// Docker doesn't provide an API to kill an exec run
+
// (sure, we could grab the PID and kill it ourselves,
+
// but that's wasted effort)
+
e.l.Warn("step timed out", "step", step.Name)
-
// wait for both goroutines to finish
-
<-waitDone
<-tailDone
return engine.ErrTimedOut
···
default:
}
-
if waitErr != nil {
-
return waitErr
-
}
-
-
err = e.DestroyStep(ctx, resp.ID)
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
if err != nil {
return err
}
-
if state.ExitCode != 0 {
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
-
if state.OOMKilled {
-
return ErrOOMKilled
-
}
-
return engine.ErrWorkflowFailed
-
}
-
-
return nil
-
}
-
-
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
-
wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
-
select {
-
case err := <-errCh:
+
if execInspectResp.ExitCode != 0 {
+
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
if err != nil {
-
return nil, err
+
return err
}
-
case <-wait:
-
}
-
e.l.Info("waited for container", "name", containerID)
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
-
info, err := e.docker.ContainerInspect(ctx, containerID)
-
if err != nil {
-
return nil, err
+
if inspectResp.State.OOMKilled {
+
return ErrOOMKilled
+
}
+
return engine.ErrWorkflowFailed
}
-
return info.State, nil
+
return nil
}
-
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
+
func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
if wfLogger == nil {
return nil
}
-
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
-
Follow: true,
-
ShowStdout: true,
-
ShowStderr: true,
-
Details: false,
-
Timestamps: false,
-
})
+
// This actually *starts* the command. Thanks, Docker!
+
logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
if err != nil {
return err
}
+
defer logs.Close()
_, err = stdcopy.StdCopy(
wfLogger.DataWriter("stdout"),
wfLogger.DataWriter("stderr"),
-
logs,
+
logs.Reader,
)
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to copy logs: %w", err)
···
return nil
}
-
func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
-
err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
-
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
-
return err
-
}
-
-
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
-
RemoveVolumes: true,
-
RemoveLinks: false,
-
Force: false,
-
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
-
return err
-
}
-
-
return nil
-
}
-
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
e.cleanupMu.Lock()
key := wid.String()
···
e.cleanup[key] = append(e.cleanup[key], fn)
}
-
func workspaceVolume(wid models.WorkflowId) string {
-
return fmt.Sprintf("workspace-%s", wid)
-
}
-
-
func nixVolume(wid models.WorkflowId) string {
-
return fmt.Sprintf("nix-%s", wid)
-
}
-
func networkName(wid models.WorkflowId) string {
return fmt.Sprintf("workflow-network-%s", wid)
}
-
-
func hostConfig(wid models.WorkflowId) *container.HostConfig {
-
hostConfig := &container.HostConfig{
-
Mounts: []mount.Mount{
-
{
-
Type: mount.TypeVolume,
-
Source: workspaceVolume(wid),
-
Target: workspaceDir,
-
},
-
{
-
Type: mount.TypeVolume,
-
Source: nixVolume(wid),
-
Target: "/nix",
-
},
-
{
-
Type: mount.TypeTmpfs,
-
Target: "/tmp",
-
ReadOnly: false,
-
TmpfsOptions: &mount.TmpfsOptions{
-
Mode: 0o1777, // world-writeable sticky bit
-
Options: [][]string{
-
{"exec"},
-
},
-
},
-
},
-
{
-
Type: mount.TypeVolume,
-
Source: "etc-nix-" + wid.String(),
-
Target: "/etc/nix",
-
},
-
},
-
ReadonlyRootfs: false,
-
CapDrop: []string{"ALL"},
-
CapAdd: []string{"CAP_DAC_OVERRIDE"},
-
SecurityOpt: []string{"no-new-privileges"},
-
ExtraHosts: []string{"host.docker.internal:host-gateway"},
-
}
-
-
return hostConfig
-
}
-
-
// thanks woodpecker
-
func isErrContainerNotFoundOrNotRunning(err error) bool {
-
// Error response from daemon: Cannot kill container: ...: No such container: ...
-
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
-
// Error response from podman daemon: can only kill running containers. ... is in state exited
-
// Error: No such container: ...
-
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
-
}
+2 -1
spindle/engines/nixery/setup_steps.go
···
)
func nixConfStep() Step {
-
setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf
+
setupCmd := `mkdir -p /etc/nix
+
echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf
echo 'build-users-group = ' >> /etc/nix/nix.conf`
return Step{
command: setupCmd,