From 96c52925aa598145c3f2c2c91c245dc6f989c0a0 Mon Sep 17 00:00:00 2001 From: Anirudh Oppiliappan Date: Fri, 13 Jun 2025 14:13:27 +0300 Subject: [PATCH] spindle/engine: setup and destroy workflows Change-Id: mzupsvxpvqvxpmomvrkmoxsnokovnswr During setup, we register cleanup functions which get executed at the end of the workflow goroutine (deferred exec of DestroyWorkflow). Signed-off-by: Anirudh Oppiliappan --- spindle/engine/engine.go | 170 ++++++++++++++++++++++++++++----------- spindle/server.go | 6 +- 2 files changed, 129 insertions(+), 47 deletions(-) diff --git a/spindle/engine/engine.go b/spindle/engine/engine.go index 11e30ed..e176946 100644 --- a/spindle/engine/engine.go +++ b/spindle/engine/engine.go @@ -8,7 +8,9 @@ import ( "log/slog" "os" "path" + "strings" "sync" + "syscall" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" @@ -28,6 +30,8 @@ const ( workspaceDir = "/tangled/workspace" ) +type cleanupFunc func(context.Context) error + type Engine struct { docker client.APIClient l *slog.Logger @@ -37,6 +41,9 @@ type Engine struct { chanMu sync.RWMutex stdoutChans map[string]chan string stderrChans map[string]chan string + + cleanupMu sync.Mutex + cleanup map[string][]cleanupFunc } func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { @@ -57,39 +64,9 @@ func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) e.stdoutChans = make(map[string]chan string, 100) e.stderrChans = make(map[string]chan string, 100) - return e, nil -} - -// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. -// in the future. In here also goes other setup steps. -func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error { - e.l.Info("setting up pipeline", "pipeline", id) + e.cleanup = make(map[string][]cleanupFunc) - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: workspaceVolume(id), - Driver: "local", - }) - if err != nil { - return err - } - - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ - Name: nixVolume(id), - Driver: "local", - }) - if err != nil { - return err - } - - _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{ - Driver: "bridge", - }) - if err != nil { - return err - } - - err = e.db.CreatePipeline(id, atUri, e.n) - return err + return e, nil } func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { @@ -103,6 +80,13 @@ func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, g := errgroup.Group{} for _, w := range pipeline.Workflows { g.Go(func() error { + err := e.SetupWorkflow(ctx, id, w.Name) + if err != nil { + return err + } + + defer e.DestroyWorkflow(ctx, id, w.Name) + // TODO: actual checks for image/registry etc. var deps string for _, d := range w.Dependencies { @@ -127,7 +111,7 @@ func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, defer reader.Close() io.Copy(os.Stdout, reader) - err = e.StartSteps(ctx, w.Steps, id, cimg) + err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg) if err != nil { e.l.Error("pipeline failed!", "id", id, "error", err.Error()) return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) @@ -147,10 +131,51 @@ func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, return e.db.MarkPipelineSuccess(id, e.n) } +// SetupWorkflow sets up a new network for the workflow and volumes for +// the workspace and Nix store. These are persisted across steps and are +// destroyed at the end of the workflow. +func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error { + e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName) + + _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ + Name: workspaceVolume(id, workflowName), + Driver: "local", + }) + if err != nil { + return err + } + e.registerCleanup(id, workflowName, func(ctx context.Context) error { + return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true) + }) + + _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ + Name: nixVolume(id, workflowName), + Driver: "local", + }) + if err != nil { + return err + } + e.registerCleanup(id, workflowName, func(ctx context.Context) error { + return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true) + }) + + _, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{ + Driver: "bridge", + }) + if err != nil { + return err + } + e.registerCleanup(id, workflowName, func(ctx context.Context) error { + return e.docker.NetworkRemove(ctx, networkName(id, workflowName)) + }) + + return nil +} + // StartSteps starts all steps sequentially with the same base image. // ONLY marks pipeline as failed if container's exit code is non-zero. // All other errors are bubbled up. -func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { +func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error { // set up logging channels e.chanMu.Lock() if _, exists := e.stdoutChans[id]; !exists { @@ -168,7 +193,7 @@ func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, }() for _, step := range steps { - hostConfig := hostConfig(id) + hostConfig := hostConfig(id, workflowName) resp, err := e.docker.ContainerCreate(ctx, &container.Config{ Image: image, Cmd: []string{"bash", "-c", step.Command}, @@ -181,7 +206,7 @@ func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, return fmt.Errorf("creating container: %w", err) } - err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil) + err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil) if err != nil { return fmt.Errorf("connecting network: %w", err) } @@ -212,6 +237,11 @@ func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, return err } + err = e.DestroyStep(ctx, resp.ID, id) + if err != nil { + return err + } + if state.ExitCode != 0 { e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode) return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n) @@ -310,6 +340,39 @@ func (e *Engine) TailStep(ctx context.Context, containerID, pipelineID string) e return nil } +func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error { + err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String()) + if err != nil && !isErrContainerNotFoundOrNotRunning(err) { + return err + } + + if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ + RemoveVolumes: true, + RemoveLinks: false, + Force: false, + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { + return err + } + + return nil +} + +func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error { + e.cleanupMu.Lock() + key := fmt.Sprintf("%s-%s", pipelineID, workflowName) + + fns := e.cleanup[key] + delete(e.cleanup, key) + e.cleanupMu.Unlock() + + for _, fn := range fns { + if err := fn(ctx); err != nil { + e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err) + } + } + return nil +} + func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { e.chanMu.RLock() defer e.chanMu.RUnlock() @@ -323,29 +386,37 @@ func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <- return stdoutCh, stderrCh, true } -func workspaceVolume(id string) string { - return "workspace-" + id +func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) { + e.cleanupMu.Lock() + defer e.cleanupMu.Unlock() + + key := fmt.Sprintf("%s-%s", pipelineID, workflowName) + e.cleanup[key] = append(e.cleanup[key], fn) +} + +func workspaceVolume(id, name string) string { + return fmt.Sprintf("workspace-%s-%s", id, name) } -func nixVolume(id string) string { - return "nix-" + id +func nixVolume(id, name string) string { + return fmt.Sprintf("nix-%s-%s", id, name) } -func pipelineName(id string) string { - return "pipeline-" + id +func networkName(id, name string) string { + return fmt.Sprintf("workflow-network-%s-%s", id, name) } -func hostConfig(id string) *container.HostConfig { +func hostConfig(id, name string) *container.HostConfig { hostConfig := &container.HostConfig{ Mounts: []mount.Mount{ { Type: mount.TypeVolume, - Source: workspaceVolume(id), + Source: workspaceVolume(id, name), Target: workspaceDir, }, { Type: mount.TypeVolume, - Source: nixVolume(id), + Source: nixVolume(id, name), Target: "/nix", }, }, @@ -356,3 +427,12 @@ func hostConfig(id string) *container.HostConfig { return hostConfig } + +// thanks woodpecker +func isErrContainerNotFoundOrNotRunning(err error) bool { + // Error response from daemon: Cannot kill container: ...: No such container: ... + // Error response from daemon: Cannot kill container: ...: Container ... is not running" + // Error response from podman daemon: can only kill running containers. ... is in state exited + // Error: No such container: ... + return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) +} diff --git a/spindle/server.go b/spindle/server.go index 0decc7d..905ed26 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -122,14 +122,16 @@ func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSourc pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) rkey := TID() - err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) + + err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n) if err != nil { return err } + return s.eng.StartWorkflows(ctx, &pipeline, rkey) }, OnFail: func(error) { - s.l.Error("pipeline setup failed", "error", err) + s.l.Error("pipeline run failed", "error", err) }, }) if ok { -- 2.43.0