···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
workspaceDir = "/tangled/workspace"
33
+
type cleanupFunc func(context.Context) error
···
stdoutChans map[string]chan string
stderrChans map[string]chan string
45
+
cleanupMu sync.Mutex
46
+
cleanup map[string][]cleanupFunc
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
e.stdoutChans = make(map[string]chan string, 100)
e.stderrChans = make(map[string]chan string, 100)
63
-
// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
64
-
// in the future. In here also goes other setup steps.
65
-
func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
66
-
e.l.Info("setting up pipeline", "pipeline", id)
67
+
e.cleanup = make(map[string][]cleanupFunc)
68
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
69
-
Name: workspaceVolume(id),
76
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
77
-
Name: nixVolume(id),
84
-
_, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
91
-
err = e.db.CreatePipeline(id, atUri, e.n)
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
···
for _, w := range pipeline.Workflows {
83
+
err := e.SetupWorkflow(ctx, id, w.Name)
88
+
defer e.DestroyWorkflow(ctx, id, w.Name)
// TODO: actual checks for image/registry etc.
for _, d := range w.Dependencies {
···
io.Copy(os.Stdout, reader)
130
-
err = e.StartSteps(ctx, w.Steps, id, cimg)
114
+
err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg)
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
return e.db.MarkPipelineSuccess(id, e.n)
134
+
// SetupWorkflow sets up a new network for the workflow and volumes for
135
+
// the workspace and Nix store. These are persisted across steps and are
136
+
// destroyed at the end of the workflow.
137
+
func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error {
138
+
e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName)
140
+
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
141
+
Name: workspaceVolume(id, workflowName),
147
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
148
+
return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true)
151
+
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
152
+
Name: nixVolume(id, workflowName),
158
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
159
+
return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true)
162
+
_, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{
168
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
169
+
return e.docker.NetworkRemove(ctx, networkName(id, workflowName))
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
153
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
178
+
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error {
// set up logging channels
if _, exists := e.stdoutChans[id]; !exists {
···
for _, step := range steps {
171
-
hostConfig := hostConfig(id)
196
+
hostConfig := hostConfig(id, workflowName)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
Cmd: []string{"bash", "-c", step.Command},
···
return fmt.Errorf("creating container: %w", err)
184
-
err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
209
+
err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil)
return fmt.Errorf("connecting network: %w", err)
···
240
+
err = e.DestroyStep(ctx, resp.ID, id)
e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
···
343
+
func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error {
344
+
err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String())
345
+
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
349
+
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
350
+
RemoveVolumes: true,
351
+
RemoveLinks: false,
353
+
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
360
+
func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error {
362
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
364
+
fns := e.cleanup[key]
365
+
delete(e.cleanup, key)
366
+
e.cleanupMu.Unlock()
368
+
for _, fn := range fns {
369
+
if err := fn(ctx); err != nil {
370
+
e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err)
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
···
return stdoutCh, stderrCh, true
326
-
func workspaceVolume(id string) string {
327
-
return "workspace-" + id
389
+
func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) {
391
+
defer e.cleanupMu.Unlock()
393
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
394
+
e.cleanup[key] = append(e.cleanup[key], fn)
397
+
func workspaceVolume(id, name string) string {
398
+
return fmt.Sprintf("workspace-%s-%s", id, name)
330
-
func nixVolume(id string) string {
401
+
func nixVolume(id, name string) string {
402
+
return fmt.Sprintf("nix-%s-%s", id, name)
334
-
func pipelineName(id string) string {
335
-
return "pipeline-" + id
405
+
func networkName(id, name string) string {
406
+
return fmt.Sprintf("workflow-network-%s-%s", id, name)
338
-
func hostConfig(id string) *container.HostConfig {
409
+
func hostConfig(id, name string) *container.HostConfig {
hostConfig := &container.HostConfig{
343
-
Source: workspaceVolume(id),
414
+
Source: workspaceVolume(id, name),
348
-
Source: nixVolume(id),
419
+
Source: nixVolume(id, name),
···
431
+
// thanks woodpecker
432
+
func isErrContainerNotFoundOrNotRunning(err error) bool {
433
+
// Error response from daemon: Cannot kill container: ...: No such container: ...
434
+
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
435
+
// Error response from podman daemon: can only kill running containers. ... is in state exited
436
+
// Error: No such container: ...
437
+
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))