···
···
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
20
-
"github.com/docker/docker/api/types/volume"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
···
76
-
env map[string]string
75
+
env map[string]string
func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
···
173
-
// SetupWorkflow sets up a new network for the workflow and volumes for
174
-
// the workspace and Nix store. These are persisted across steps and are
175
-
// destroyed at the end of the workflow.
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
e.l.Info("setting up workflow", "workflow", wid)
179
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
180
-
Name: workspaceVolume(wid),
175
+
_, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
e.registerCleanup(wid, func(ctx context.Context) error {
187
-
return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
182
+
return e.docker.NetworkRemove(ctx, networkName(wid))
190
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
191
-
Name: nixVolume(wid),
185
+
addl := wf.Data.(addlFields)
187
+
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
189
+
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
191
+
return fmt.Errorf("pulling image: %w", err)
193
+
defer reader.Close()
194
+
io.Copy(os.Stdout, reader)
196
+
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
198
+
Cmd: []string{"cat"},
199
+
OpenStdin: true, // so cat stays alive :3
201
+
Hostname: "spindle",
202
+
// TODO(winter): investigate whether environment variables passed here
203
+
// get propagated to ContainerExec processes
204
+
}, &container.HostConfig{
205
+
Mounts: []mount.Mount{
207
+
Type: mount.TypeTmpfs,
210
+
TmpfsOptions: &mount.TmpfsOptions{
211
+
Mode: 0o1777, // world-writeable sticky bit
212
+
Options: [][]string{
218
+
ReadonlyRootfs: false,
219
+
CapDrop: []string{"ALL"},
220
+
CapAdd: []string{"CAP_DAC_OVERRIDE"},
221
+
SecurityOpt: []string{"no-new-privileges"},
222
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
225
+
return fmt.Errorf("creating container: %w", err)
e.registerCleanup(wid, func(ctx context.Context) error {
198
-
return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
228
+
err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
233
+
return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
234
+
RemoveVolumes: true,
235
+
RemoveLinks: false,
201
-
_, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
240
+
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
242
+
return fmt.Errorf("starting container: %w", err)
245
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
246
+
Cmd: []string{"mkdir", "-p", workspaceDir},
247
+
AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
248
+
AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
207
-
e.registerCleanup(wid, func(ctx context.Context) error {
208
-
return e.docker.NetworkRemove(ctx, networkName(wid))
211
-
addl := wf.Data.(addlFields)
254
+
// This actually *starts* the command. Thanks, Docker!
255
+
execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
259
+
defer execResp.Close()
213
-
reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
261
+
// This is apparently best way to wait for the command to complete.
262
+
_, err = io.ReadAll(execResp.Reader)
215
-
e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
217
-
return fmt.Errorf("pulling image: %w", err)
267
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
219
-
defer reader.Close()
220
-
io.Copy(os.Stdout, reader)
272
+
if execInspectResp.ExitCode != 0 {
273
+
return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
274
+
} else if execInspectResp.Running {
275
+
return errors.New("mkdir is somehow still running??")
278
+
addl.container = resp.ID
func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
226
-
workflowEnvs := ConstructEnvs(w.Data.(addlFields).env)
285
+
addl := w.Data.(addlFields)
286
+
workflowEnvs := ConstructEnvs(addl.env)
287
+
// TODO(winter): should SetupWorkflow also have secret access?
288
+
// IMO yes, but probably worth thinking on.
for _, s := range secrets {
workflowEnvs.AddEnv(s.Key, s.Value)
···
envs.AddEnv("HOME", workspaceDir)
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
246
-
hostConfig := hostConfig(wid)
247
-
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
248
-
Image: w.Data.(addlFields).image,
249
-
Cmd: []string{"bash", "-c", step.command},
250
-
WorkingDir: workspaceDir,
252
-
Hostname: "spindle",
254
-
}, hostConfig, nil, nil, "")
255
-
defer e.DestroyStep(ctx, resp.ID)
257
-
return fmt.Errorf("creating container: %w", err)
260
-
err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
308
+
mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
309
+
Cmd: []string{"bash", "-c", step.command},
310
+
AttachStdout: true,
311
+
AttachStderr: true,
262
-
return fmt.Errorf("connecting network: %w", err)
314
+
return fmt.Errorf("creating exec: %w", err)
265
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
269
-
e.l.Info("started container", "name", resp.ID, "step", step.Name)
// start tailing logs in background
tailDone := make(chan error, 1)
274
-
tailDone <- e.TailStep(ctx, wfLogger, resp.ID, wid, idx, step)
277
-
// wait for container completion or timeout
278
-
waitDone := make(chan struct{})
279
-
var state *container.State
283
-
defer close(waitDone)
284
-
state, waitErr = e.WaitStep(ctx, resp.ID)
320
+
tailDone <- e.TailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
290
-
// wait for tailing to complete
294
-
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
295
-
err = e.DestroyStep(context.Background(), resp.ID)
297
-
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
327
+
// cleanup will be handled by DestroyWorkflow, since
328
+
// Docker doesn't provide an API to kill an exec run
329
+
// (sure, we could grab the PID and kill it ourselves,
330
+
// but that's wasted effort)
331
+
e.l.Warn("step timed out", "step", step.Name)
300
-
// wait for both goroutines to finish
return engine.ErrTimedOut
···
313
-
if waitErr != nil {
317
-
err = e.DestroyStep(ctx, resp.ID)
344
+
execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
322
-
if state.ExitCode != 0 {
323
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
324
-
if state.OOMKilled {
325
-
return ErrOOMKilled
327
-
return engine.ErrWorkflowFailed
333
-
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
334
-
wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
336
-
case err := <-errCh:
349
+
if execInspectResp.ExitCode != 0 {
350
+
inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
343
-
e.l.Info("waited for container", "name", containerID)
355
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
345
-
info, err := e.docker.ContainerInspect(ctx, containerID)
357
+
if inspectResp.State.OOMKilled {
358
+
return ErrOOMKilled
360
+
return engine.ErrWorkflowFailed
350
-
return info.State, nil
353
-
func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
366
+
func (e *Engine) TailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
358
-
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
371
+
// This actually *starts* the command. Thanks, Docker!
372
+
logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
_, err = stdcopy.StdCopy(
wfLogger.DataWriter("stdout"),
wfLogger.DataWriter("stderr"),
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to copy logs: %w", err)
···
381
-
func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
382
-
err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
383
-
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
387
-
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
388
-
RemoveVolumes: true,
389
-
RemoveLinks: false,
391
-
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
···
e.cleanup[key] = append(e.cleanup[key], fn)
422
-
func workspaceVolume(wid models.WorkflowId) string {
423
-
return fmt.Sprintf("workspace-%s", wid)
426
-
func nixVolume(wid models.WorkflowId) string {
427
-
return fmt.Sprintf("nix-%s", wid)
func networkName(wid models.WorkflowId) string {
return fmt.Sprintf("workflow-network-%s", wid)
434
-
func hostConfig(wid models.WorkflowId) *container.HostConfig {
435
-
hostConfig := &container.HostConfig{
436
-
Mounts: []mount.Mount{
438
-
Type: mount.TypeVolume,
439
-
Source: workspaceVolume(wid),
440
-
Target: workspaceDir,
443
-
Type: mount.TypeVolume,
444
-
Source: nixVolume(wid),
448
-
Type: mount.TypeTmpfs,
451
-
TmpfsOptions: &mount.TmpfsOptions{
452
-
Mode: 0o1777, // world-writeable sticky bit
453
-
Options: [][]string{
459
-
Type: mount.TypeVolume,
460
-
Source: "etc-nix-" + wid.String(),
461
-
Target: "/etc/nix",
464
-
ReadonlyRootfs: false,
465
-
CapDrop: []string{"ALL"},
466
-
CapAdd: []string{"CAP_DAC_OVERRIDE"},
467
-
SecurityOpt: []string{"no-new-privileges"},
468
-
ExtraHosts: []string{"host.docker.internal:host-gateway"},
474
-
// thanks woodpecker
475
-
func isErrContainerNotFoundOrNotRunning(err error) bool {
476
-
// Error response from daemon: Cannot kill container: ...: No such container: ...
477
-
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
478
-
// Error response from podman daemon: can only kill running containers. ... is in state exited
479
-
// Error: No such container: ...
480
-
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))