···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
workspaceDir = "/tangled/workspace"
···
stdoutChans map[string]chan string
stderrChans map[string]chan string
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
e.stdoutChans = make(map[string]chan string, 100)
e.stderrChans = make(map[string]chan string, 100)
-
// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
-
// in the future. In here also goes other setup steps.
-
func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
-
e.l.Info("setting up pipeline", "pipeline", id)
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: workspaceVolume(id),
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
_, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
-
err = e.db.CreatePipeline(id, atUri, e.n)
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
···
for _, w := range pipeline.Workflows {
// TODO: actual checks for image/registry etc.
for _, d := range w.Dependencies {
···
io.Copy(os.Stdout, reader)
-
err = e.StartSteps(ctx, w.Steps, id, cimg)
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
return e.db.MarkPipelineSuccess(id, e.n)
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
// set up logging channels
if _, exists := e.stdoutChans[id]; !exists {
···
for _, step := range steps {
-
hostConfig := hostConfig(id)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
Cmd: []string{"bash", "-c", step.Command},
···
return fmt.Errorf("creating container: %w", err)
-
err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
return fmt.Errorf("connecting network: %w", err)
···
e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
···
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
···
return stdoutCh, stderrCh, true
-
func workspaceVolume(id string) string {
-
return "workspace-" + id
-
func nixVolume(id string) string {
-
func pipelineName(id string) string {
-
return "pipeline-" + id
-
func hostConfig(id string) *container.HostConfig {
hostConfig := &container.HostConfig{
-
Source: workspaceVolume(id),
···
···
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
workspaceDir = "/tangled/workspace"
+
type cleanupFunc func(context.Context) error
···
stdoutChans map[string]chan string
stderrChans map[string]chan string
+
cleanup map[string][]cleanupFunc
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
e.stdoutChans = make(map[string]chan string, 100)
e.stderrChans = make(map[string]chan string, 100)
+
e.cleanup = make(map[string][]cleanupFunc)
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
···
for _, w := range pipeline.Workflows {
+
err := e.SetupWorkflow(ctx, id, w.Name)
+
defer e.DestroyWorkflow(ctx, id, w.Name)
// TODO: actual checks for image/registry etc.
for _, d := range w.Dependencies {
···
io.Copy(os.Stdout, reader)
+
err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg)
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
return e.db.MarkPipelineSuccess(id, e.n)
+
// SetupWorkflow sets up a new network for the workflow and volumes for
+
// the workspace and Nix store. These are persisted across steps and are
+
// destroyed at the end of the workflow.
+
func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error {
+
e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName)
+
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
+
Name: workspaceVolume(id, workflowName),
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
+
return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true)
+
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
+
Name: nixVolume(id, workflowName),
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
+
return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true)
+
_, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{
+
e.registerCleanup(id, workflowName, func(ctx context.Context) error {
+
return e.docker.NetworkRemove(ctx, networkName(id, workflowName))
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
+
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error {
// set up logging channels
if _, exists := e.stdoutChans[id]; !exists {
···
for _, step := range steps {
+
hostConfig := hostConfig(id, workflowName)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
Cmd: []string{"bash", "-c", step.Command},
···
return fmt.Errorf("creating container: %w", err)
+
err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil)
return fmt.Errorf("connecting network: %w", err)
···
+
err = e.DestroyStep(ctx, resp.ID, id)
e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
···
+
func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error {
+
err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String())
+
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
+
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
+
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
+
func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error {
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
+
for _, fn := range fns {
+
if err := fn(ctx); err != nil {
+
e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err)
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
···
return stdoutCh, stderrCh, true
+
func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) {
+
defer e.cleanupMu.Unlock()
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
+
e.cleanup[key] = append(e.cleanup[key], fn)
+
func workspaceVolume(id, name string) string {
+
return fmt.Sprintf("workspace-%s-%s", id, name)
+
func nixVolume(id, name string) string {
+
return fmt.Sprintf("nix-%s-%s", id, name)
+
func networkName(id, name string) string {
+
return fmt.Sprintf("workflow-network-%s-%s", id, name)
+
func hostConfig(id, name string) *container.HostConfig {
hostConfig := &container.HostConfig{
+
Source: workspaceVolume(id, name),
+
Source: nixVolume(id, name),
···
+
func isErrContainerNotFoundOrNotRunning(err error) bool {
+
// Error response from daemon: Cannot kill container: ...: No such container: ...
+
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
+
// Error response from podman daemon: can only kill running containers. ... is in state exited
+
// Error: No such container: ...
+
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))