From bf726080696668686796befb87319f655faf450e Mon Sep 17 00:00:00 2001 From: Winter Date: Sat, 9 Aug 2025 00:04:18 -0400 Subject: [PATCH] spindle/engines/nixery: provision one container per workflow Change-Id: przvrtsoqzkovszvxuvwkomzpzkuvkrm This moves away from the old method of creating a container with some shared volumes to one that most users would expect: any changes made in one step will be accessible by the following steps, and not only if they're in the workspace or `/etc/nix`. This also paves the way for a more generic Docker image engine, as users can do things like `apt install` without the results being blown away across steps. Signed-off-by: Winter --- spindle/engines/nixery/engine.go | 303 ++++++++++---------------- spindle/engines/nixery/setup_steps.go | 3 +- 2 files changed, 121 insertions(+), 185 deletions(-) diff --git a/spindle/engines/nixery/engine.go b/spindle/engines/nixery/engine.go index 3335dee..467344d 100644 --- a/spindle/engines/nixery/engine.go +++ b/spindle/engines/nixery/engine.go @@ -9,7 +9,6 @@ import ( "os" "path" "runtime" - "strings" "sync" "time" @@ -17,7 +16,6 @@ import ( "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" - "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "gopkg.in/yaml.v3" @@ -72,8 +70,9 @@ func (ss *setupSteps) addStep(step models.Step) { } type addlFields struct { - image string - env map[string]string + image string + container string + env map[string]string } func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { @@ -170,60 +169,123 @@ func New(ctx context.Context, cfg *config.Config) (*Engine, error) { return e, nil } -// SetupWorkflow sets up a new network for the workflow and volumes for -// the workspace and Nix store. These are persisted across steps and are -// destroyed at the end of the workflow. func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { e.l.Info("setting up workflow", "workflow", wid) - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: workspaceVolume(wid), - Driver: "local", + _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ + Driver: "bridge", }) if err != nil { return err } e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) + return e.docker.NetworkRemove(ctx, networkName(wid)) }) - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: nixVolume(wid), - Driver: "local", - }) + addl := wf.Data.(addlFields) + + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) if err != nil { - return err + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) + + return fmt.Errorf("pulling image: %w", err) + } + defer reader.Close() + io.Copy(os.Stdout, reader) + + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ + Image: addl.image, + Cmd: []string{"cat"}, + OpenStdin: true, // so cat stays alive :3 + Tty: false, + Hostname: "spindle", + // TODO(winter): investigate whether environment variables passed here + // get propagated to ContainerExec processes + }, &container.HostConfig{ + Mounts: []mount.Mount{ + { + Type: mount.TypeTmpfs, + Target: "/tmp", + ReadOnly: false, + TmpfsOptions: &mount.TmpfsOptions{ + Mode: 0o1777, // world-writeable sticky bit + Options: [][]string{ + {"exec"}, + }, + }, + }, + }, + ReadonlyRootfs: false, + CapDrop: []string{"ALL"}, + CapAdd: []string{"CAP_DAC_OVERRIDE"}, + SecurityOpt: []string{"no-new-privileges"}, + ExtraHosts: []string{"host.docker.internal:host-gateway"}, + }, nil, nil, "") + if err != nil { + return fmt.Errorf("creating container: %w", err) } e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) + if err != nil { + return err + } + + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ + RemoveVolumes: true, + RemoveLinks: false, + Force: false, + }) }) - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ - Driver: "bridge", + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) + if err != nil { + return fmt.Errorf("starting container: %w", err) + } + + mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ + Cmd: []string{"mkdir", "-p", workspaceDir}, + AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? + AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") }) if err != nil { return err } - e.registerCleanup(wid, func(ctx context.Context) error { - return e.docker.NetworkRemove(ctx, networkName(wid)) - }) - addl := wf.Data.(addlFields) + // This actually *starts* the command. Thanks, Docker! + execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) + if err != nil { + return err + } + defer execResp.Close() - reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) + // This is apparently best way to wait for the command to complete. + _, err = io.ReadAll(execResp.Reader) if err != nil { - e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) + return err + } - return fmt.Errorf("pulling image: %w", err) + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) + if err != nil { + return err } - defer reader.Close() - io.Copy(os.Stdout, reader) + + if execInspectResp.ExitCode != 0 { + return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) + } else if execInspectResp.Running { + return errors.New("mkdir is somehow still running??") + } + + addl.container = resp.ID + wf.Data = addl return nil } func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { - workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) + addl := w.Data.(addlFields) + workflowEnvs := ConstructEnvs(addl.env) + // TODO(winter): should SetupWorkflow also have secret access? + // IMO yes, but probably worth thinking on. for _, s := range secrets { workflowEnvs.AddEnv(s.Key, s.Value) } @@ -243,62 +305,31 @@ func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.W envs.AddEnv("HOME", workspaceDir) e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) - hostConfig := hostConfig(wid) - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ - Image: w.Data.(addlFields).image, - Cmd: []string{"bash", "-c", step.command}, - WorkingDir: workspaceDir, - Tty: false, - Hostname: "spindle", - Env: envs.Slice(), - }, hostConfig, nil, nil, "") - defer e.DestroyStep(ctx, resp.ID) - if err != nil { - return fmt.Errorf("creating container: %w", err) - } - - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) + mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ + Cmd: []string{"bash", "-c", step.command}, + AttachStdout: true, + AttachStderr: true, + }) if err != nil { - return fmt.Errorf("connecting network: %w", err) + return fmt.Errorf("creating exec: %w", err) } - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) - if err != nil { - return err - } - e.l.Info("started container", "name", resp.ID, "step", step.Name) - // start tailing logs in background tailDone := make(chan error, 1) go func() { - tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step) - }() - - // wait for container completion or timeout - waitDone := make(chan struct{}) - var state *container.State - var waitErr error - - go func() { - defer close(waitDone) - state, waitErr = e.WaitStep(ctx, resp.ID) + tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) }() select { - case <-waitDone: - - // wait for tailing to complete - <-tailDone + case <-tailDone: case <-ctx.Done(): - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) - err = e.DestroyStep(context.Background(), resp.ID) - if err != nil { - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) - } + // cleanup will be handled by DestroyWorkflow, since + // Docker doesn't provide an API to kill an exec run + // (sure, we could grab the PID and kill it ourselves, + // but that's wasted effort) + e.l.Warn("step timed out", "step", step.Name) - // wait for both goroutines to finish - <-waitDone <-tailDone return engine.ErrTimedOut @@ -310,66 +341,44 @@ func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.W default: } - if waitErr != nil { - return waitErr - } - - err = e.DestroyStep(ctx, resp.ID) + execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) if err != nil { return err } - if state.ExitCode != 0 { - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) - if state.OOMKilled { - return ErrOOMKilled - } - return engine.ErrWorkflowFailed - } - - return nil -} - -func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) - select { - case err := <-errCh: + if execInspectResp.ExitCode != 0 { + inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) if err != nil { - return nil, err + return err } - case <-wait: - } - e.l.Info("waited for container", "name", containerID) + e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) - info, err := e.docker.ContainerInspect(ctx, containerID) - if err != nil { - return nil, err + if inspectResp.State.OOMKilled { + return ErrOOMKilled + } + return engine.ErrWorkflowFailed } - return info.State, nil + return nil } -func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { +func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { if wfLogger == nil { return nil } - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ - Follow: true, - ShowStdout: true, - ShowStderr: true, - Details: false, - Timestamps: false, - }) + // This actually *starts* the command. Thanks, Docker! + logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) if err != nil { return err } + defer logs.Close() _, err = stdcopy.StdCopy( wfLogger.DataWriter("stdout"), wfLogger.DataWriter("stderr"), - logs, + logs.Reader, ) if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { return fmt.Errorf("failed to copy logs: %w", err) @@ -378,23 +387,6 @@ func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, return nil } -func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err - } - - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ - RemoveVolumes: true, - RemoveLinks: false, - Force: false, - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { - return err - } - - return nil -} - func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { e.cleanupMu.Lock() key := wid.String() @@ -419,63 +411,6 @@ func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { e.cleanup[key] = append(e.cleanup[key], fn) } -func workspaceVolume(wid models.WorkflowId) string { - return fmt.Sprintf("workspace-%s", wid) -} - -func nixVolume(wid models.WorkflowId) string { - return fmt.Sprintf("nix-%s", wid) -} - func networkName(wid models.WorkflowId) string { return fmt.Sprintf("workflow-network-%s", wid) } - -func hostConfig(wid models.WorkflowId) *container.HostConfig { - hostConfig := &container.HostConfig{ - Mounts: []mount.Mount{ - { - Type: mount.TypeVolume, - Source: workspaceVolume(wid), - Target: workspaceDir, - }, - { - Type: mount.TypeVolume, - Source: nixVolume(wid), - Target: "/nix", - }, - { - Type: mount.TypeTmpfs, - Target: "/tmp", - ReadOnly: false, - TmpfsOptions: &mount.TmpfsOptions{ - Mode: 0o1777, // world-writeable sticky bit - Options: [][]string{ - {"exec"}, - }, - }, - }, - { - Type: mount.TypeVolume, - Source: "etc-nix-" + wid.String(), - Target: "/etc/nix", - }, - }, - ReadonlyRootfs: false, - CapDrop: []string{"ALL"}, - CapAdd: []string{"CAP_DAC_OVERRIDE"}, - SecurityOpt: []string{"no-new-privileges"}, - ExtraHosts: []string{"host.docker.internal:host-gateway"}, - } - - return hostConfig -} - -// thanks woodpecker -func isErrContainerNotFoundOrNotRunning(err error) bool { - // Error response from daemon: Cannot kill container: ...: No such container: ... - // Error response from daemon: Cannot kill container: ...: Container ... is not running" - // Error response from podman daemon: can only kill running containers. ... is in state exited - // Error: No such container: ... - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) -} diff --git a/spindle/engines/nixery/setup_steps.go b/spindle/engines/nixery/setup_steps.go index a502b5f..e74ef9a 100644 --- a/spindle/engines/nixery/setup_steps.go +++ b/spindle/engines/nixery/setup_steps.go @@ -10,7 +10,8 @@ import ( ) func nixConfStep() Step { - setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf + setupCmd := `mkdir -p /etc/nix +echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf echo 'build-users-group = ' >> /etc/nix/nix.conf` return Step{ command: setupCmd, -- 2.43.0