spindle/engine: setup and destroy workflows #243

merged
opened by anirudh.fi targeting master from push-mzupsvxpvqvx

During setup, we register cleanup functions which get executed at the end of the workflow goroutine (deferred exec of DestroyWorkflow).

Signed-off-by: Anirudh Oppiliappan anirudh@tangled.sh

Changed files
+129 -47
spindle
+125 -45
spindle/engine/engine.go
···
"log/slog"
"os"
"path"
+
"strings"
"sync"
+
"syscall"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/image"
···
workspaceDir = "/tangled/workspace"
)
+
type cleanupFunc func(context.Context) error
+
type Engine struct {
docker client.APIClient
l *slog.Logger
···
chanMu sync.RWMutex
stdoutChans map[string]chan string
stderrChans map[string]chan string
+
+
cleanupMu sync.Mutex
+
cleanup map[string][]cleanupFunc
}
func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) {
···
e.stdoutChans = make(map[string]chan string, 100)
e.stderrChans = make(map[string]chan string, 100)
-
return e, nil
-
}
-
-
// SetupPipeline sets up a new network for the pipeline, and possibly volumes etc.
-
// in the future. In here also goes other setup steps.
-
func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error {
-
e.l.Info("setting up pipeline", "pipeline", id)
+
e.cleanup = make(map[string][]cleanupFunc)
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: workspaceVolume(id),
-
Driver: "local",
-
})
-
if err != nil {
-
return err
-
}
-
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
-
Name: nixVolume(id),
-
Driver: "local",
-
})
-
if err != nil {
-
return err
-
}
-
-
_, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{
-
Driver: "bridge",
-
})
-
if err != nil {
-
return err
-
}
-
-
err = e.db.CreatePipeline(id, atUri, e.n)
-
return err
+
return e, nil
}
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error {
···
g := errgroup.Group{}
for _, w := range pipeline.Workflows {
g.Go(func() error {
+
err := e.SetupWorkflow(ctx, w, pipeline, id)
+
if err != nil {
+
return err
+
}
+
+
defer e.DestroyWorkflow(ctx, id, w.Name)
+
// TODO: actual checks for image/registry etc.
var deps string
for _, d := range w.Dependencies {
···
defer reader.Close()
io.Copy(os.Stdout, reader)
-
err = e.StartSteps(ctx, w.Steps, id, cimg)
+
err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg)
if err != nil {
e.l.Error("pipeline failed!", "id", id, "error", err.Error())
return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n)
···
return e.db.MarkPipelineSuccess(id, e.n)
}
+
// SetupWorkflow sets up a new network for the workflow and volumes for
+
// the workspace and Nix store. These are persisted across steps and are
+
// destroyed at the end of the workflow.
+
func (e *Engine) SetupWorkflow(ctx context.Context, workflow *tangled.Pipeline_Workflow, pipeline *tangled.Pipeline, id string) error {
+
e.l.Info("setting up workflow", "pipeline", id, "workflow", workflow.Name)
+
+
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
+
Name: workspaceVolume(id, workflow.Name),
+
Driver: "local",
+
})
+
if err != nil {
+
return err
+
}
+
e.registerCleanup(id, workflow.Name, func(ctx context.Context) error {
+
return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflow.Name), true)
+
})
+
+
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
+
Name: nixVolume(id, workflow.Name),
+
Driver: "local",
+
})
+
if err != nil {
+
return err
+
}
+
e.registerCleanup(id, workflow.Name, func(ctx context.Context) error {
+
return e.docker.VolumeRemove(ctx, nixVolume(id, workflow.Name), true)
+
})
+
+
_, err = e.docker.NetworkCreate(ctx, networkName(id, workflow.Name), network.CreateOptions{
+
Driver: "bridge",
+
})
+
if err != nil {
+
return err
+
}
+
e.registerCleanup(id, workflow.Name, func(ctx context.Context) error {
+
return e.docker.NetworkRemove(ctx, networkName(id, workflow.Name))
+
})
+
+
return nil
+
}
+
// StartSteps starts all steps sequentially with the same base image.
// ONLY marks pipeline as failed if container's exit code is non-zero.
// All other errors are bubbled up.
-
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error {
+
func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error {
// set up logging channels
e.chanMu.Lock()
if _, exists := e.stdoutChans[id]; !exists {
···
}()
for _, step := range steps {
-
hostConfig := hostConfig(id)
+
hostConfig := hostConfig(id, workflowName)
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
Image: image,
Cmd: []string{"bash", "-c", step.Command},
···
return fmt.Errorf("creating container: %w", err)
}
-
err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil)
+
err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil)
if err != nil {
return fmt.Errorf("connecting network: %w", err)
}
···
return err
}
+
err = e.DestroyStep(ctx, resp.ID, id)
+
if err != nil {
+
return err
+
}
+
if state.ExitCode != 0 {
e.l.Error("pipeline failed!", "id", id, "error", state.Error, "exit_code", state.ExitCode)
return e.db.MarkPipelineFailed(id, state.ExitCode, state.Error, e.n)
···
return nil
}
+
func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error {
+
err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String())
+
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
+
return err
+
}
+
+
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
+
RemoveVolumes: true,
+
RemoveLinks: false,
+
Force: false,
+
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
+
return err
+
}
+
+
return nil
+
}
+
+
func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error {
+
e.cleanupMu.Lock()
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
+
+
fns := e.cleanup[key]
+
delete(e.cleanup, key)
+
e.cleanupMu.Unlock()
+
+
for _, fn := range fns {
+
if err := fn(ctx); err != nil {
+
e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err)
+
}
+
}
+
return nil
+
}
+
func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) {
e.chanMu.RLock()
defer e.chanMu.RUnlock()
···
return stdoutCh, stderrCh, true
}
-
func workspaceVolume(id string) string {
-
return "workspace-" + id
+
func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) {
+
e.cleanupMu.Lock()
+
defer e.cleanupMu.Unlock()
+
+
key := fmt.Sprintf("%s-%s", pipelineID, workflowName)
+
e.cleanup[key] = append(e.cleanup[key], fn)
+
}
+
+
func workspaceVolume(id, name string) string {
+
return fmt.Sprintf("workspace-%s-%s", id, name)
}
-
func nixVolume(id string) string {
-
return "nix-" + id
+
func nixVolume(id, name string) string {
+
return fmt.Sprintf("nix-%s-%s", id, name)
}
-
func pipelineName(id string) string {
-
return "pipeline-" + id
+
func networkName(id, name string) string {
+
return fmt.Sprintf("workflow-network-%s-%s", id, name)
}
-
func hostConfig(id string) *container.HostConfig {
+
func hostConfig(id, name string) *container.HostConfig {
hostConfig := &container.HostConfig{
Mounts: []mount.Mount{
{
Type: mount.TypeVolume,
-
Source: workspaceVolume(id),
+
Source: workspaceVolume(id, name),
Target: workspaceDir,
},
{
Type: mount.TypeVolume,
-
Source: nixVolume(id),
+
Source: nixVolume(id, name),
Target: "/nix",
},
},
···
return hostConfig
}
+
+
// thanks woodpecker
+
func isErrContainerNotFoundOrNotRunning(err error) bool {
+
// Error response from daemon: Cannot kill container: ...: No such container: ...
+
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
+
// Error response from podman daemon: can only kill running containers. ... is in state exited
+
// Error: No such container: ...
+
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
+
}
+4 -2
spindle/server.go
···
pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey)
rkey := TID()
-
err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey)
+
+
err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n)
if err != nil {
return err
}
+
return s.eng.StartWorkflows(ctx, &pipeline, rkey)
},
OnFail: func(error) {
-
s.l.Error("pipeline setup failed", "error", err)
+
s.l.Error("pipeline run failed", "error", err)
},
})
if ok {